对于消费者群体,Kafka Stream偏移重置为零

Par*_*ari 5 apache-kafka apache-kafka-streams

我编写了Kafka Streaming应用程序,它只根据某些条件过滤行并将其加载到MongoDB.

流式处理工作正常但由于我的代码存在一些缺陷,我想再次重新处理整个数据.

一种方法是杀死流媒体应用,更改消费者群组ID,从mongo删除数据并重新运行应用程序.

如何在不更改使用者组ID的情况下实现此方案.

"我正在使用Kafka 0.10版本"

非常感谢Pari

Mic*_*oll 7

Apache Kafka 0.10.0.1(已于8月发布,而7月份提出原始问题)附带了一个新的Kafka Streams应用程序重置工具,这是一种比简单重命名更简单,更好/更清晰的解决方案application.id.

您可以通过脚本执行该工具,该脚本bin/kafka-streams-application-reset.sh还将打印使用/帮助消息.

例:

# Run this only after ALL application instances were stopped!
$ bin/kafka-streams-application-reset --application-id my-streams-app \
                                      --input-topics my-input-topic \
                                      --intermediate-topics rekeyed-topic \
                                      --bootstrap-servers brokerHost:9092 \
                                      --zookeeper zookeeperHost:2181
Run Code Online (Sandbox Code Playgroud)

也就是说,我建议阅读Kafka Streams的数据重新处理:重置Streams应用程序,前面提到的Matthias J. Sax写道,以获取更多细节.那篇文章还解释了为什么简单地重命名application.id(这是迄今为止的解决方法)并不是最好的主意.