小编Jas*_*hoi的帖子

是否可以在 kafka 连接器中重置 kafka 消费者组主题的偏移量?

我的 kafka 接收器连接器从多个主题(配置了 10 个任务)读取数据,并处理来自所有主题的超过 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。

以下是触发器记录中键:值对的示例:

"REPROCESS":"my-topic-1"

读取此记录后,我需要将主题“my-topic-1”在其每个分区中的偏移量重置为 0。

我在很多地方读到,创建一个新的KafkaConsumer、订阅主题的分区,然后调用该subscribe(...)方法是推荐的方式。例如,

public class MyTask extends SinkTask {

    @Override
    public void put(Collection<SinkRecord> records) {
        records.forEach(record -> {
        if (record.key().toString().equals("REPROCESS")) {
            reprocessTopicRecords(record);
        } else {
            // do something else
        }
        });
    }
    private void reprocessTopicRecords(SinkRecord record) {
        KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer = 
            new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
        reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
            new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
                    // do offset reset here
                }
            }
        ); …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api apache-kafka-connect

7
推荐指数
1
解决办法
2万
查看次数