Kafka Streams 在生成主题时不会将偏移量增加 1

DVS*_*DVS 5 java apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

我已经实现了一个简单的 Kafka 死信记录处理器。

当使用控制台生产者产生的记录时,它工作得很好。

但是,我发现我们的 Kafka Streams 应用程序并不能保证为接收器主题生成记录,对于生成的每个记录,偏移量将增加 1。

死信处理器背景:

我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录。当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续向下流。当新数据发布时,我们将来自死信主题的最新消息转储回流应用程序的源主题,以便使用新数据进行重新处理。

死信处理器:

  • 在运行应用程序开始时记录每个分区的结束偏移量
  • 如果重新处理的记录返回到死信主题,结束偏移量标记停止处理给定死信主题的记录的点,以避免无限循环。
  • 应用程序通过消费者组从上次运行产生的最后一个偏移量恢复。
  • 应用程序正在使用事务并KafkaProducer#sendOffsetsToTransaction提交最后产生的偏移量。

为了跟踪我的范围内的所有记录何时针对某个主题的分区被处理,我的服务将其从生产者的最后产生的偏移量与消费者保存的结束偏移量映射进行比较。当我们到达结束偏移量时,消费者通过以下方式暂停该分区KafkaConsumer#pause,当所有分区都暂停时(意味着它们到达保存的结束偏移量),然后调用它退出。

卡夫卡消费者API国:

偏移量和消费者位置 Kafka 为分区中的每条记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,并且还表示消费者在该分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。

卡夫卡生产者API引用下一偏移量始终是+1为好。

将指定偏移量列表发送给消费者组协调器,并将这些偏移量标记为当前事务的一部分。仅当事务成功提交时,这些偏移量才会被视为已提交。提交的偏移量应该是您的应用程序将使用的下一条消息,即 lastProcessedMessageOffset + 1。

但是您可以在我的调试器中清楚地看到,单个分区消耗的记录一次只增加 1 次... 在此处输入图片说明

我想这可能是 Kafka 配置问题,max.message.bytes但没有一个真正有意义。然后我想也许是因为加入,但没有看到任何会改变制片人运作方式的方式。

不确定它是否相关,但我们所有的 Kafka 应用程序都在使用 Avro 和 Schema Registry...

无论生产方法如何,偏移量是否应该始终增加 1,或者使用 Kafka 流 API 是否可能无法提供与普通生产者消费者客户端相同的保证?

有什么完全是我遗漏的吗?

Mat*_*Sax 9

It is not an official API contract that message offsets are increased by one, even if the JavaDocs indicate this (it seems that the JavaDocs should be updated).

  • 如果您不使用事务,您将获得至少一次语义或不保证(有些人称之为至少一次语义)。对于至少一次,记录可能会被写入两次,因此,由于重复写入“消耗”了两个偏移量,因此两个连续消息的偏移量并没有真正增加 1。

  • 如果您使用事务,则事务的每次提交(或中止)都会将提交(或中止)标记写入主题——这些事务标记也“消耗”一个偏移量(这是您观察到的)。

因此,通常您不应该依赖连续的偏移量。您得到的唯一保证是,每个偏移量在分区内都是唯一的。