小编Vis*_*ish的帖子

在Kafka 0.11中sendOffsetsToTransaction的含义

新的Kafka版本(0.11)只支持一次语义.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

我在java中有一个使用kafka事务代码的生成器设置,就像这样.

producer.initTransactions();
    try {
        producer.beginTransaction();
        for (ProducerRecord<String, String> record : payload) {
            producer.send(record);
        }

        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
            {
                put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
            }
        };
        producer.sendOffsetsToTransaction(groupCommit, "groupId");
        producer.commitTransaction();
    } catch (ProducerFencedException e) {
        producer.close();
    } catch (KafkaException e) {
        producer.abortTransaction();
    }
Run Code Online (Sandbox Code Playgroud)

我不太确定如何使用sendOffsetsToTransaction以及它的预期用例.AFAIK,消费者群体是消费者端的多线程阅读功能.

贾瓦多克说

"发送消耗的偏移列表的消费群协调,也标志着这些偏移作为当前事务的一部分.这些偏移仅如果交易成功提交才算消耗.当你需要批量消费应该用这种方法并且一起产生消息,通常是消费转换产生模式."

如何生成维护消耗的偏移列表?什么意思呢?

java multithreading apache-kafka kafka-producer-api

7
推荐指数
1
解决办法
1106
查看次数