Kafka Streams:ConsumerRebalanceListener 实现

Ram*_*man 2 apache-kafka apache-kafka-streams

您能否告知以下类需要如何在流配置中注册?

public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {

  static final Logger oLogger = Logger.getLogger(StreamConsumerRebalanceListener.class);

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition p : partitions) {
        oLogger.info(p + " partitions has been assigned to the stream instance");
    }

  }

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition p : partitions) {
        oLogger.warn(p + " partitions has been removed from the stream instance");
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

Mat*_*Sax 5

Kafka Streams 不会公开 API 来指定自定义,ConsumerRebalanceListener因为 Kafka Streams 使用自己的实现,该实现会传递给内部使用的KafkaConsumer.

请注意,内部使用的侦听器会在 INFO 模式下记录分配,并在 DEBUG 模式下记录一些附加日志。因此,不需要添加额外的自定义日志记录。

如果这是一个关键功能,请随时创建功能请求 JIRA: https: //issues.apache.org/jira/projects/KAFKA

更新:

如果您使用 a Processor(或Transformer类似的),您也许可以使用init()andclose()来代替。这些在分配分区之后和撤销分区之前调用。