我们有Kafka设置,可以通过多个服务器并行处理消息.但每条消息必须只处理一次(并且只能由一台服务器处理).我们已经启动并运行它并且工作正常.
现在,我们面临的问题是Kafka消费者分批阅读消息以获得最大效率.如果/当处理失败,服务器关闭或其他什么时,这会导致问题,因为这样我们就会丢失即将处理的数据.
有没有办法让消费者一次只读取消息让Kafka保留未处理的消息?就像是; 消费者在完成后拉出一条消息 - >进程 - >提交偏移,重复.使用Kafka这是可行的吗?有什么想法/想法吗?
谢谢!
我有以下程序可以使用所有发送到Kafka的消息。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'])
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
KafkaConsumer.close()
Run Code Online (Sandbox Code Playgroud)
使用以上程序,我可以使用所有发送到Kafka的消息。但是,一旦所有消息都消耗掉了,我想关闭没有发生的kafka用户。我同样需要帮助。