KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量

1 scala apache-kafka kafka-producer-api apache-kafka-streams

我正在尝试在 Kafka 中实现事务,Processor以确保不会重复处理同一条消息。给定消息 (A),我需要创建将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A)。从文档中我发现该Producer方法sendOffsetsToTransaction似乎只有在成功时才能在事务中提交偏移量。process()这是我的方法中的代码Processor

    producer.beginTransaction()
    val topicPartition    = new TopicPartition(this.context().topic(), this.context().partition())
    val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
    val map               = Map(topicPartition -> offsetAndMetadata).asJava
    producer.sendOffsetsToTransaction(map, "consumer-group-id")
    items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
    producer.commitTransaction()
    throw new RuntimeException("expected exception")
Run Code Online (Sandbox Code Playgroud)

不幸的是,使用这段代码(显然每次执行都会失败),每次我在异常后重新启动应用程序时,都会重新处理已处理的消息 (A)。

我设法使其工作,将 a 添加+1到返回的偏移量this.context().offset()val offsetAndMetadata以这种方式重新定义:

val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
Run Code Online (Sandbox Code Playgroud)

这是正常行为还是我做错了什么?

谢谢 :)

Mat*_*Sax 5

你的代码是正确的。

您提交的偏移量是您接下来要读取的消息的偏移量(而不是您上次读取的消息的偏移量)。

比较: https: //github.com/apache/kafka/blob/41e4e93b5ae8a7d221fce1733e050cb98ac9713c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L346