Jas*_*hoi 7 apache-kafka kafka-consumer-api apache-kafka-connect
我的 kafka 接收器连接器从多个主题(配置了 10 个任务)读取数据,并处理来自所有主题的超过 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。
以下是触发器记录中键:值对的示例:
"REPROCESS":"my-topic-1"
读取此记录后,我需要将主题“my-topic-1”在其每个分区中的偏移量重置为 0。
我在很多地方读到,创建一个新的KafkaConsumer、订阅主题的分区,然后调用该subscribe(...)方法是推荐的方式。例如,
public class MyTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
records.forEach(record -> {
if (record.key().toString().equals("REPROCESS")) {
reprocessTopicRecords(record);
} else {
// do something else
}
});
}
private void reprocessTopicRecords(SinkRecord record) {
KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer =
new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// do offset reset here
}
}
);
}
}
Run Code Online (Sandbox Code Playgroud)
但是,上述策略不适用于我的情况,因为: 1. 它取决于发生的组重新平衡(并不总是发生) 2. 传递给该onPartitionsAssigned方法的“分区”是动态分配的分区,这意味着这些只是需要重置偏移量的完整分区集。例如,此 SinkTask 将仅分配保存“my-topic-1”记录的 8 个分区中的 2 个。
我也研究过使用,assign()但这与 SinkConnector/SinkTask 实现中的分布式消费者模型(消费者组)不兼容。
我知道kafka命令行工具kafka-consumer-groups可以完全满足我的要求(我认为):
https ://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
总而言之,我想使用 Java API 重置给定主题的所有分区的偏移量,并让 Sink Connector 拾取偏移量更改并继续执行其一直在执行的操作(处理记录)。
提前致谢。
我能够通过使用一系列 Confluence 的 kafka-rest-proxy API 来重置 kafka 连接消费者组的偏移量:https://docs.confluence.io/current/kafka-rest/api.html
此实现不再需要原始帖子中首先描述的“触发记录”方法,并且纯粹基于 Rest API。
暂时删除kafka连接器(这会删除连接器的消费者和)
为同一个消费者组创建一个消费者实例(“connect-”)
让实例订阅您想要重置的请求主题
进行虚拟民意调查(“订阅”被延迟评估)
重置指定主题的消费者组主题偏移量
进行虚拟轮询('seek' 被延迟评估')为消费者提交当前偏移状态(在代理中)
重新创建kafka连接器(具有相同的连接器名称) - 重新平衡后,消费者将加入该组并读取最后提交的偏移量(从0开始)
删除临时消费者实例
如果您能够使用 CLI,则步骤 2-6 可以替换为:
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
对于那些尝试通过本机 Java API 在 kafka 连接器代码中执行此操作的人来说,您运气不好:-(
| 归档时间: |
|
| 查看次数: |
20810 次 |
| 最近记录: |