我正在尝试使用 kafka,MultiProcessConsumer但出现以下错误。似乎错误与python中的多线程有关
这是我正在使用的代码。
simple.py
from kafka import SimpleProducer, SimpleClient, SimpleConsumer, MultiProcessConsumer
# To consume messages
client = SimpleClient('localhost:9092')
consumer = MultiProcessConsumer(client, "my-group", "testing_topic", num_procs=3)
for message in consumer:
# message is raw byte string -- decode if necessary!
# e.g., for unicode: `message.decode('utf-8')`
print(message)
client.close()
Run Code Online (Sandbox Code Playgroud)
运行上述代码时出错。
$ python simple.py
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/var/users/ec2-user/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/managers.py", line 749, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above …Run Code Online (Sandbox Code Playgroud) python python-3.x apache-kafka kafka-consumer-api kafka-python
如果生成的记录失败,我想设置要触发的回调。最初,我只想记录失败的记录。
Confluent Kafka python 库提供了一种添加回调的机制:
produce(topic[, value][, key][, partition][, on_delivery][, timestamp])
...
on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery
Run Code Online (Sandbox Code Playgroud)
如何使用 kafka-python kafka.KafkaProducer#send()实现类似的行为,而不必使用已弃用的 SimpleClient 使用 kafka.SimpleClient#send_produce_request()
我有一个消费者脚本,它处理每条消息并手动向主题提交偏移量。
CONSUMER = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_SERVER],
auto_offset_reset="earliest",
max_poll_records=100,
enable_auto_commit=False,
group_id=CONSUMER_GROUP,
# Use the RoundRobinPartition method
partition_assignment_strategy=[RoundRobinPartitionAssignor],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
while True:
count += 1
LOGGER.info("--------------Poll {0}---------".format(count))
for msg in CONSUMER:
# Process msg.value
# Commit offset to topic
tp = TopicPartition(msg.topic, msg.partition)
offsets = {tp: OffsetAndMetadata(msg.offset, None)}
CONSUMER.commit(offsets=offsets)
Run Code Online (Sandbox Code Playgroud)
处理每条消息所需的时间 < 1 秒。
我收到此错误错误:
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用来自 Kafka 主题的消息。我在confluent_kafka消费者周围使用包装器。在开始使用消息之前,我需要检查是否建立了连接。
我读到消费者很懒惰,所以我需要执行一些操作才能建立连接。但是我想在不执行consumeorpoll操作的情况下检查连接建立。
此外,我尝试给出一些错误的配置,以查看民意调查的反应是什么。我得到的回应是:
b'Broker: No more messages'
Run Code Online (Sandbox Code Playgroud)
那么,如何判断是连接参数错误、连接中断还是主题中实际上没有消息?
我将在我的应用程序中使用 Kafka 作为消息代理。此应用程序完全使用 Python 编写。对于此应用程序的一部分(登录和身份验证),我需要实现一个请求-回复消息传递系统。换句话说,生产者需要同步地从消费者那里得到生产消息的响应。使用 Kafka 及其 Python 库 (kafka-python, ...) 是否可行?
我正在使用Python Kafka主题.
是否有任何提供生产者可以更新Kafka队列中的消息并将其再次附加到队列的顶部?
根据卡夫卡的规范,这似乎不可行.
我是卡夫卡新手。我们正在尝试将数据从 csv 文件导入到 Kafka。我们需要每天导入,同时前一天的数据已被废弃。如何在Python中删除Kafka主题下的所有消息?或者如何删除 python 中的 Kafka 主题?或者我看到有人建议等待数据过期,如果可以的话我该如何设置数据过期时间?任何建议将不胜感激!
谢谢
使用kafka-python-1.0.2。
如果我有一个包含10个分区的主题,那么如何遍历各个分区和消息,同时提交一个特定的分区。我似乎无法在文档中或其他任何地方找到此示例
从文档中,我想使用:
consumer.commit(offset=offsets)
具体来说,如何创建偏移量所需的分区和OffsetAndMetadata字典(字典,可选)– {TopicPartition:OffsetAndMetadata}。
我希望函数调用就像这样:
consumer.commit(partition, offset)
但这似乎并非如此。
提前致谢。
我有一个应用程序,用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容。我创建了一个有 5 个分区的主题,并且有 5 个 kafka 消费者。但是,网页下载的超时时间为 60 秒。当其中一个 url 被下载时,服务器假定消息丢失并将数据重新发送给不同的消费者。
我已经尝试了中提到的所有内容
和
https://github.com/spring-projects/spring-kafka/issues/202
但我每次都会收到不同的错误。
是否可以将特定消费者与 kafka 中的分区联系起来?我正在为我的应用程序使用 kafka-python
我是 python 中 kafka 客户端的初学者,我需要一些帮助来描述使用客户端的主题。
我能够使用以下代码列出我所有的 kafka 主题:-
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['kafka1'])
topicList = consumer.topics()
Run Code Online (Sandbox Code Playgroud)