我在项目中使用 confluence-kafka Python 客户端。我正在尝试使用此客户端创建 Docker 映像。
我面临以下错误:-
#11 8.015 [pipenv.exceptions.InstallError]: In file included from /tmp/pip-install-so_whhii/confluent-kafka_9d9553bf46cf489bb25fcb2ac7698747/src/confluent_kafka/src/Admin.c:17:
#11 8.015 [pipenv.exceptions.InstallError]: /tmp/pip-install-so_whhii/confluent-kafka_9d9553bf46cf489bb25fcb2ac7698747/src/confluent_kafka/src/confluent_kafka.h:23:10: fatal error: librdkafka/rdkafka.h: No such file or directory
#11 8.015 [pipenv.exceptions.InstallError]: 23 | #include <librdkafka/rdkafka.h>
#11 8.015 [pipenv.exceptions.InstallError]: | ^~~~~~~~~~~~~~~~~~~~~~
#11 8.015 [pipenv.exceptions.InstallError]: compilation terminated.
#11 8.015 [pipenv.exceptions.InstallError]: error: command '/usr/bin/gcc' failed with exit code 1
#11 8.016 [pipenv.exceptions.InstallError]: [end of output]
Run Code Online (Sandbox Code Playgroud)
根据我的搜索,它与 librdkafka 的 Apple M1 版本相关。
python apache-kafka docker librdkafka confluent-kafka-python
我是 Kafka 新手,使用融合的 kafka 并尝试使用带有 'sasl.mechanism': 'PLAIN','security.protocol': 'SASL_SSL' 的 python 生产者代码从 AWS EC2 实例将消息写入现有的 kafka 主题。我尝试过此链接中的生产者示例。出现以下错误。如果有任何可以指导我什么会导致这个问题将会有很大的帮助。
Failed to deliver message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Run Code Online (Sandbox Code Playgroud) 我尝试使用confluence-kafka python 库通过 lambda 函数管理我的集群,但该函数失败并出现错误:
"Unable to import module 'Test': No module named 'confluent_kafka.cimpl'"
Run Code Online (Sandbox Code Playgroud)
我的要求.txt
requests
confluent-kafka
Run Code Online (Sandbox Code Playgroud)
为了创建 zip 文件,我将代码移动到虚拟环境的 site-packages 位置并压缩所有内容。
Python代码:
import confluent_kafka.admin
import requests
def lambda_handler(event, context):
print("Hello World")
Run Code Online (Sandbox Code Playgroud)
我使用的是 macOS 10.X。在 Linux 上,我注意到 pip install 创建了一个单独的 confluence_kafka.libs,它不会在 mac 上创建
python amazon-web-services aws-lambda confluent-kafka-python
我的虚拟机在 ubuntu 18.04 上运行。我在上面安装了:
from .cimpl import (Consumer, #noqa
ModuleNotFoundError: no module named 'confluent_kafka.cimpl'
Run Code Online (Sandbox Code Playgroud)
我尝试以多种方式运行/安装/删除,但仍然出现此错误。
到目前为止,这是我尝试过的:
from confluent_kafka import Consumer
c = Consumer({... several security/server settings skipped...
'auto.offset.reset': 'beginning',
'group.id': 'my-group'})
c.subscribe(['my.topic'])
msg = poll(30.0) # msg is of None type.
Run Code Online (Sandbox Code Playgroud)
msg几乎总是最终成为None这样。我认为问题可能是'my-group'已经消耗了所有消息'my.topic'......但我不在乎消息是否已经被消耗 - 我仍然需要最新的消息。具体来说,我需要最新消息的时间戳。
我又尝试了一些,从这里看来,该主题中可能有 25 条消息,但我不知道如何获取它们:
a = c.assignment()
print(a) # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets) # Outputs: (25, 25)
Run Code Online (Sandbox Code Playgroud)
如果因为该主题从未写入任何内容而没有消息,我该如何确定?如果是这样,我如何确定该主题存在了多长时间?我正在编写一个脚本,自动删除过去 X 天内未写入的任何主题(最初为 14 个 - 可能会随着时间的推移进行调整。)
python apache-kafka kafka-consumer-api confluent-platform confluent-kafka-python
我的卡夫卡消费者有问题。当我在本地主机中运行我的消费者时,他运行正确,但是当我在另一个环境中运行我的消费者时,他显示此错误
%6|1629836068.968|rdkafka#consumer-1| [thrd:ssl://kafka-events:11101/12101]:ssl://kafka-events:11101/12101:已断开连接(处于状态 UP 684744 毫秒后)
我不知道这个错误的根本原因是什么。
我的 kafka 消费者是用 python 3 开发的,具有汇合的 kafka DeserializingConsumer。
这个问题类似于Python KafkaConsumer start 消费来自时间戳的消息,除了我想知道如何在 Confluence 的官方 Python Kafka 客户端中执行此操作。
我研究了Consumer.offsets_for_times函数,但我对它接受TopicPartition.offset字段中的时间戳感到困惑。
a 如何offset等同于时间戳?
使用现有的 Java 示例,我尝试使用 python-kafka 和 confluence_kafka 库编写一个与生成器等效的 python 版本。如何使用类似于下面 Java 中的信息在 python 中配置 sasl.jass.config?
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
...
Properties props = new Properties();
...
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<Kafka_Username>\" password=\"<Kafka_Password>\";");
Producer<String, String> producer = new KafkaProducer<>(props);
Run Code Online (Sandbox Code Playgroud)