标签: kafka-python

Python`atexit._run_exitfuncs 中的错误

我正在尝试使用 kafka,MultiProcessConsumer但出现以下错误。似乎错误与python中的多线程有关

这是我正在使用的代码。

simple.py

from kafka import SimpleProducer, SimpleClient, SimpleConsumer, MultiProcessConsumer

# To consume messages
client = SimpleClient('localhost:9092')
consumer = MultiProcessConsumer(client, "my-group", "testing_topic", num_procs=3)
for message in consumer:
    # message is raw byte string -- decode if necessary!
    # e.g., for unicode: `message.decode('utf-8')`
    print(message)

client.close()
Run Code Online (Sandbox Code Playgroud)

运行上述代码时出错。

$ python simple.py 
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/var/users/ec2-user/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/managers.py", line 749, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above …
Run Code Online (Sandbox Code Playgroud)

python python-3.x apache-kafka kafka-consumer-api kafka-python

5
推荐指数
0
解决办法
3010
查看次数

如何为 kafka-python kafka.KafkaProducer#send() 添加失败回调?

如果生成的记录失败,我想设置要触发的回调。最初,我只想记录失败的记录。

Confluent Kafka python 库提供了一种添加回调的机制:

produce(topic[, value][, key][, partition][, on_delivery][, timestamp])
...
    on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery
Run Code Online (Sandbox Code Playgroud)

如何使用 kafka-python kafka.KafkaProducer#send()实现类似的行为,而不必使用已弃用的 SimpleClient 使用 kafka.SimpleClient#send_produce_request()

kafka-python

5
推荐指数
1
解决办法
3093
查看次数

手动提交偏移量到 kafka 主题的正确方法是什么

我有一个消费者脚本,它处理每条消息并手动向主题提交偏移量。

CONSUMER = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_SERVER],
    auto_offset_reset="earliest",
    max_poll_records=100,
    enable_auto_commit=False,
    group_id=CONSUMER_GROUP,
    # Use the RoundRobinPartition method
    partition_assignment_strategy=[RoundRobinPartitionAssignor],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

while True:
    count += 1
    LOGGER.info("--------------Poll {0}---------".format(count))
    for msg in CONSUMER:
        # Process msg.value
        # Commit offset to topic
        tp = TopicPartition(msg.topic, msg.partition)
        offsets = {tp: OffsetAndMetadata(msg.offset, None)}
        CONSUMER.commit(offsets=offsets)
Run Code Online (Sandbox Code Playgroud)

处理每条消息所需的时间 < 1 秒。

我收到此错误错误:

kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent …
Run Code Online (Sandbox Code Playgroud)

python python-3.x apache-kafka kafka-python

5
推荐指数
1
解决办法
3438
查看次数

如何以编程方式检查 Kafka Broker 是否已启动并在 Python 中运行

我正在尝试使用来自 Kafka 主题的消息。我在confluent_kafka消费者周围使用包装器。在开始使用消息之前,我需要检查是否建立了连接。

我读到消费者很懒惰,所以我需要执行一些操作才能建立连接。但是我想在不执行consumeorpoll操作的情况下检查连接建立。

此外,我尝试给出一些错误的配置,以查看民意调查的反应是什么。我得到的回应是:

b'Broker: No more messages'
Run Code Online (Sandbox Code Playgroud)

那么,如何判断是连接参数错误、连接中断还是主题中实际上没有消息?

python apache-kafka kafka-consumer-api kafka-python

5
推荐指数
1
解决办法
4521
查看次数

如何在 Kafka 中实现请求-回复(同步)消息传递范式?

我将在我的应用程序中使用 Kafka 作为消息代理。此应用程序完全使用 Python 编写。对于此应用程序的一部分(登录和身份验证),我需要实现一个请求-回复消息传递系统。换句话说,生产者需要同步地从消费者那里得到生产消息的响应。使用 Kafka 及其 Python 库 (kafka-python, ...) 是否可行?

python synchronous apache-kafka kafka-python

5
推荐指数
1
解决办法
644
查看次数

更新Kafka主题中的消息

我正在使用Python Kafka主题.

是否有任何提供生产者可以更新Kafka队列中的消息并将其再次附加到队列的顶部?

根据卡夫卡的规范,这似乎不可行.

python apache-kafka kafka-consumer-api kafka-python

4
推荐指数
1
解决办法
2449
查看次数

Python如何删除Kafka主题下的所有消息

我是卡夫卡新手。我们正在尝试将数据从 csv 文件导入到 Kafka。我们需要每天导入,同时前一天的数据已被废弃。如何在Python中删除Kafka主题下的所有消息?或者如何删除 python 中的 Kafka 主题?或者我看到有人建议等待数据过期,如果可以的话我该如何设置数据过期时间?任何建议将不胜感激!

谢谢

python python-2.7 apache-kafka kafka-python

4
推荐指数
1
解决办法
1万
查看次数

kafka-python-如何提交分区?

使用kafka-python-1.0.2。

如果我有一个包含10个分区的主题,那么如何遍历各个分区和消息,同时提交一个特定的分区。我似乎无法在文档中或其他任何地方找到此示例

从文档中,我想使用:

consumer.commit(offset=offsets)

具体来说,如何创建偏移量所需的分区和OffsetAndMetadata字典(字典,可选)– {TopicPartition:OffsetAndMetadata}。

我希望函数调用就像这样:

consumer.commit(partition, offset)

但这似乎并非如此。

提前致谢。

python kafka-consumer-api kafka-python

4
推荐指数
3
解决办法
7815
查看次数

如何强制消费者读取kafka中的特定分区

我有一个应用程序,用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容。我创建了一个有 5 个分区的主题,并且有 5 个 kafka 消费者。但是,网页下载的超时时间为 60 秒。当其中一个 url 被下载时,服务器假定消息丢失并将数据重新发送给不同的消费者。

我已经尝试了中提到的所有内容

Kafka 消费者配置/性能问题

https://github.com/spring-projects/spring-kafka/issues/202

但我每次都会收到不同的错误。

是否可以将特定消费者与 kafka 中的分区联系起来?我正在为我的应用程序使用 kafka-python

apache-kafka kafka-python

4
推荐指数
1
解决办法
1万
查看次数

如何在 Python 中使用 kafka 客户端描述主题

我是 python 中 kafka 客户端的初学者,我需要一些帮助来描述使用客户端的主题。

我能够使用以下代码列出我所有的 kafka 主题:-

consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['kafka1'])
topicList = consumer.topics()
Run Code Online (Sandbox Code Playgroud)

python kafka-python

3
推荐指数
2
解决办法
3077
查看次数