1 scala apache-kafka kafka-producer-api apache-kafka-streams
我正在尝试在 Kafka 中实现事务,Processor
以确保不会重复处理同一条消息。给定消息 (A),我需要创建将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A)。从文档中我发现该Producer
方法sendOffsetsToTransaction
似乎只有在成功时才能在事务中提交偏移量。process()
这是我的方法中的代码Processor
:
producer.beginTransaction()
val topicPartition = new TopicPartition(this.context().topic(), this.context().partition())
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
val map = Map(topicPartition -> offsetAndMetadata).asJava
producer.sendOffsetsToTransaction(map, "consumer-group-id")
items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
producer.commitTransaction()
throw new RuntimeException("expected exception")
Run Code Online (Sandbox Code Playgroud)
不幸的是,使用这段代码(显然每次执行都会失败),每次我在异常后重新启动应用程序时,都会重新处理已处理的消息 (A)。
我设法使其工作,将 a 添加+1
到返回的偏移量this.context().offset()
并val offsetAndMetadata
以这种方式重新定义:
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
Run Code Online (Sandbox Code Playgroud)
这是正常行为还是我做错了什么?
谢谢 :)
你的代码是正确的。
您提交的偏移量是您接下来要读取的消息的偏移量(而不是您上次读取的消息的偏移量)。