Ram*_*man 2 apache-kafka apache-kafka-streams
您能否告知以下类需要如何在流配置中注册?
public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {
static final Logger oLogger = Logger.getLogger(StreamConsumerRebalanceListener.class);
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
oLogger.info(p + " partitions has been assigned to the stream instance");
}
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
oLogger.warn(p + " partitions has been removed from the stream instance");
}
}
}
Run Code Online (Sandbox Code Playgroud)
Kafka Streams 不会公开 API 来指定自定义,ConsumerRebalanceListener
因为 Kafka Streams 使用自己的实现,该实现会传递给内部使用的KafkaConsumer
.
请注意,内部使用的侦听器会在 INFO 模式下记录分配,并在 DEBUG 模式下记录一些附加日志。因此,不需要添加额外的自定义日志记录。
如果这是一个关键功能,请随时创建功能请求 JIRA: https: //issues.apache.org/jira/projects/KAFKA
更新:
如果您使用 a Processor
(或Transformer
类似的),您也许可以使用init()
andclose()
来代替。这些在分配分区之后和撤销分区之前调用。
归档时间: |
|
查看次数: |
1290 次 |
最近记录: |