Kafka enable.auto.commit 设置为 false 但 poll 仍然获取“下一条”消息

Mik*_*man 5 apache-kafka message-hub

我想告诉 Kafka 我的消费者何时成功处理了一条记录,因此我通过设置为 false 关闭了自动提交enable.auto.commit。我有两条关于我在偏移量 0 和 1 处订阅的主题的消息,并创建了一个消费者,以便每次调用poll最多返回一条记录(通过设置max.poll.records为 1)。

我现在打电话consumer.poll(5000)并收到第一条消息,但我没有确认;我不打电话commitSynccommitAsync。如果我现在consumer.poll(5000)使用同一个消费者再次调用,我希望得到与刚刚读到的完全相同的消息,但相反,我收到了第二条消息。

在我明确确认之前,我如何才能consumer.poll继续发出相同的消息?

Mic*_*son 5

您所描述的是预期的行为。每次调用poll(),它都会返回下一条消息。您提交的偏移量仅在连接新消费者时使用,因此它知道从哪里(重新)开始。

在 MessageHub 中,我们将其设置session.timeout为 30 秒。所以你需要poll()稍微快点打电话,以免被断线。如果你的处理时间比这更长,那么我可以想到两个选择:

  • 使用 Kafka 0.10.2 并设置告诉您的 Kafka 客户端在处理上一条记录时max.poll.interval.ms保持会话处于活动状态(无需调用)。poll()(此功能已在 0.10.1 中添加,但我们不支持该版本。0.10.2 可以使用,因为它能够与 0.10.0 代理一起使用)

  • 使用seek() 移回之前的偏移量,poll以便它保持返回相同的记录。

希望这可以帮助!