新的Kafka版本(0.11)只支持一次语义.
我在java中有一个使用kafka事务代码的生成器设置,就像这样.
producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}
Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
Run Code Online (Sandbox Code Playgroud)
我不太确定如何使用sendOffsetsToTransaction以及它的预期用例.AFAIK,消费者群体是消费者端的多线程阅读功能.
贾瓦多克说
"发送消耗的偏移列表的消费群协调,也标志着这些偏移作为当前事务的一部分.这些偏移仅如果交易成功提交才算消耗.当你需要批量消费应该用这种方法并且一起产生消息,通常是消费转换产生模式."
如何生成维护消耗的偏移列表?什么意思呢?