使用命令行检查消息是否已写入 kafka 主题

Ang*_*g D 3 apache-kafka

首先,请注意,使用 java 消费者 API 不是一种选择。为什么它不是一个我无法透露的选项,但我必须能够使用 shell 命令执行以下操作。

我有一个已向其写入消息的主题,如果我使用./kafka-console-consumer.sh--from-beginning选项运行,我可以确认是这种情况,但由于这会启动一个使用者,因此该命令会卡住并需要使用 SIGINT 进行手动干预。我已经接近使用--timeout-ms,但是这并不理想,因为除非我选择一个高值,否则数据转储可能不可靠。
我想以可以 grep 的方式转储控制台消费者的输出,或者使用合适的替代方法。

Rob*_*att 6

当您写入 Kafka 时,您可以在生产者中 acks设置您希望代理保证消息已被本地代理和/或所有副本接收并写入的保证级别。

如果您使用它,那么您无需尝试从主题中使用来确定记录是否已写入。这听起来是一个非常糟糕的尝试。

如果您绝对必须使用命令行工具来执行此操作(这不是一个好主意),那么使用kafkacat它可以从任何偏移量消耗任意数量的消息,例如:

  • 从头 ( )开始消耗 ( -C) 5 条消息 ( ),或在到达分区末尾时退出 ( )-c 5-o beginning-e

    kafkacat -b localhost:9092 -t mytopic -o beginning -e -C -c 5
    
    Run Code Online (Sandbox Code Playgroud)
  • 从末尾( )开始消耗 ( ) -C10 条消息 ( ) ,或在到达分区末尾时退出 ( )-c 10-o -10-e

    kafkacat -b localhost:9092 -t mytopic -o -10 -e -C -c 10
    
    Run Code Online (Sandbox Code Playgroud)
  • 在偏移量 42 ( ) 处消耗 ( )-C一条消息 ( ) ,或在到达分区末尾时退出 ( )-c 1-o 42-e

    kafkacat -b localhost:9092 -t mytopic -o 42 -e -C -c 1
    
    Run Code Online (Sandbox Code Playgroud)