Kafka如何阅读__consumer_offsets主题

Mar*_*ina 23 apache-kafka kafka-consumer-api

我正试图找出目前的高级消费者正在努力解决的问题.我使用Kafka 0.8.2.1,在Kafka的server.properties中没有设置"offset.storage" - 我认为这意味着偏移量存储在Kafka中.(I也证实没有偏移量存储在动物园管理员通过检查ZK壳此路径:/consumers/consumer_group_name/offsets/topic_name/partition_number)

我试着听听这个__consumer_offsets话题,看看哪个消费者保存了什么价值的抵消,但它没有用......

我尝试了以下方法:

为控制台使用者创建了一个配置文件如下:

=> more kafka_offset_consumer.config 

 exclude.internal.topics=false
Run Code Online (Sandbox Code Playgroud)

并尝试了两个版本的控制台消费者脚本:

#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181

#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config
Run Code Online (Sandbox Code Playgroud)

两者都没有奏效 - 它只是坐在那里但不打印任何东西,即使消费者正在积极消费/节省抵消.

我错过了一些其他配置/属性吗?

谢谢!

码头

小智 52

在尝试使用__consumer_offsets主题时,我遇到了这个问题.我设法找出了不同的卡夫卡版本,并认为我会分享我发现的东西

对于Kafka 0.8.2.x

#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
Run Code Online (Sandbox Code Playgroud)

对于Kafka 0.9.xx和0.10.0.0

#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
Run Code Online (Sandbox Code Playgroud)

对于0.11.0.0

#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
Run Code Online (Sandbox Code Playgroud)

  • 对于较新的版本(.10或更高版本,我假设)考虑使用--bootstrap-server:9092而不是--zookeeper:2181 (5认同)
  • 截至今天,formatter已更新为`kafka.coordinator.group.GroupMetadataManager\$ OffsetsMessageFormatter`. (4认同)
  • 为什么需要`exclude.internal.topics=false`?它可以在没有该选项的情况下消费。 (2认同)

cri*_*007 5

从Kafka 0.11开始,(Scala)源代码可以在这里找到

对于那些需要Java转换的用户,可以从任何Consumer流程中获得ConsumerRecord<byte[], byte[]> consumerRecord,然后可以使用

  1. 获取密钥(首先检查密钥是否不为null)并使用GroupMetadataManager.readMessageKey(consumerRecord.key)。那可以返回不同的类型,所以检查if ( ... instanceof OffsetKey),然后进行转换,您可以从中获得各种值。

  2. 要获取偏移量的Kafka记录值,可以使用 String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))

从Scala代码翻译过来的最小的Java示例...

byte[] key = consumerRecord.key;
if (key != null) {
    Object o = GroupMetadataManager.readMessageKey(key);
    if (o != null && o instanceOf OffsetKey) {
        OffsetKey offsetKey = (OffsetKey) o;
        Object groupTopicPartition = offsetKey.key;
        byte[] value = consumerRecord.value;
        String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
       // TODO: Print, store, or compute results with the new key and value 
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,也可以使用AdminClient API来描述组,而不是使用这些原始消息


Scala源代码摘录

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
  Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
    // Only print if the message is an offset record.
    // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
    case offsetKey: OffsetKey =>
      val groupTopicPartition = offsetKey.key
      val value = consumerRecord.value
      val formattedValue =
        if (value == null) "NULL"
        else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
      output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
      output.write("::".getBytes(StandardCharsets.UTF_8))
      output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
      output.write("\n".getBytes(StandardCharsets.UTF_8))
    case _ => // no-op
  }
Run Code Online (Sandbox Code Playgroud)