相关疑难解决方法(0)

让Kafka Consumer一次只读一条消息

我们有Kafka设置,可以通过多个服务器并行处理消息.但每条消息必须只处理一次(并且只能由一台服务器处理).我们已经启动并运行它并且工作正常.

现在,我们面临的问题是Kafka消费者分批阅读消息以获得最大效率.如果/当处理失败,服务器关闭或其他什么时,这会导致问题,因为这样我们就会丢失即将处理的数据.

有没有办法让消费者一次只读取消息让Kafka保留未处理的消息?就像是; 消费者在完成后拉出一条消息 - >进程 - >提交偏移,重复.使用Kafka这是可行的吗?有什么想法/想法吗?

谢谢!

apache-kafka

10
推荐指数
2
解决办法
1万
查看次数

一旦所有消息都用完了,如何关闭kafka用户?

我有以下程序可以使用所有发送到Kafka的消息。

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
KafkaConsumer.close()
Run Code Online (Sandbox Code Playgroud)

使用以上程序,我可以使用所有发送到Kafka的消息。但是,一旦所有消息都消耗掉了,我想关闭没有发生的kafka用户。我同样需要帮助。

python python-3.x kafka-consumer-api

5
推荐指数
1
解决办法
3176
查看次数