是否可以在 kafka 连接器中重置 kafka 消费者组主题的偏移量?

Jas*_*hoi 7 apache-kafka kafka-consumer-api apache-kafka-connect

我的 kafka 接收器连接器从多个主题(配置了 10 个任务)读取数据,并处理来自所有主题的超过 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。

以下是触发器记录中键:值对的示例:

"REPROCESS":"my-topic-1"

读取此记录后,我需要将主题“my-topic-1”在其每个分区中的偏移量重置为 0。

我在很多地方读到,创建一个新的KafkaConsumer、订阅主题的分区,然后调用该subscribe(...)方法是推荐的方式。例如,

public class MyTask extends SinkTask {

    @Override
    public void put(Collection<SinkRecord> records) {
        records.forEach(record -> {
        if (record.key().toString().equals("REPROCESS")) {
            reprocessTopicRecords(record);
        } else {
            // do something else
        }
        });
    }
    private void reprocessTopicRecords(SinkRecord record) {
        KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer = 
            new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
        reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
            new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
                    // do offset reset here
                }
            }
        );
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,上述策略不适用于我的情况,因为: 1. 它取决于发生的组重新平衡(并不总是发生) 2. 传递给该onPartitionsAssigned方法的“分区”是动态分配的分区,这意味着这些只是需要重置偏移量的完整分区集。例如,此 SinkTask 将仅分配保存“my-topic-1”记录的 8 个分区中的 2 个。

我也研究过使用,assign()但这与 SinkConnector/SinkTask 实现中的分布式消费者模型(消费者组)不兼容。

我知道kafka命令行工具kafka-consumer-groups可以完全满足我的要求(我认为): https ://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

总而言之,我想使用 Java API 重置给定主题的所有分区的偏移量,并让 Sink Connector 拾取偏移量更改并继续执行其一直在执行的操作(处理记录)。

提前致谢。

Jas*_*hoi 5

我能够通过使用一系列 Confluence 的 kafka-rest-proxy API 来重置 kafka 连接消费者组的偏移量:https://docs.confluence.io/current/kafka-rest/api.html

此实现不再需要原始帖子中首先描述的“触发记录”方法,并且纯粹基于 Rest API。

  1. 暂时删除kafka连接器(这会删除连接器的消费者和)

  2. 为同一个消费者组创建一个消费者实例(“connect-”)

  3. 让实例订阅您想要重置的请求主题

  4. 进行虚拟民意调查(“订阅”被延迟评估)

  5. 重置指定主题的消费者组主题偏移量

  6. 进行虚拟轮询('seek' 被延迟评估')为消费者提交当前偏移状态(在代理中)

  7. 重新创建kafka连接器(具有相同的连接器名称) - 重新平衡后,消费者将加入该组并读取最后提交的偏移量(从0开始)

  8. 删除临时消费者实例

如果您能够使用 CLI,则步骤 2-6 可以替换为:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

对于那些尝试通过本机 Java API 在 kafka 连接器代码中执行此操作的人来说,您运气不好:-(