我的 kafka 接收器连接器从多个主题(配置了 10 个任务)读取数据,并处理来自所有主题的超过 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。
以下是触发器记录中键:值对的示例:
"REPROCESS":"my-topic-1"
读取此记录后,我需要将主题“my-topic-1”在其每个分区中的偏移量重置为 0。
我在很多地方读到,创建一个新的KafkaConsumer、订阅主题的分区,然后调用该subscribe(...)方法是推荐的方式。例如,
public class MyTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
records.forEach(record -> {
if (record.key().toString().equals("REPROCESS")) {
reprocessTopicRecords(record);
} else {
// do something else
}
});
}
private void reprocessTopicRecords(SinkRecord record) {
KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer =
new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// do offset reset here
}
}
); …Run Code Online (Sandbox Code Playgroud)