结构化流式 Kafka 源偏移存储

bug*_*ggy 0 offset apache-kafka apache-spark spark-streaming spark-structured-streaming

我正在使用 Kafka 的 Structured Streaming 源(集成指南),如前所述,它没有提交任何偏移量。

我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:

它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每个记录附带的偏移量不适合用例) ?

干杯

小智 5

kafka 的结构化流将偏移量保存到结构下方的 HDFS。

checkpointLocation 设置示例如下。

.writeStream.
.....
  option("checkpointLocation", "/tmp/checkPoint")
.....
Run Code Online (Sandbox Code Playgroud)

在这种情况下,kafka 的 Structured Streaming 保存在以下路径

/tmp/checkPoint/offsets/$'batchid'
Run Code Online (Sandbox Code Playgroud)

保存的文件包含以下格式。

v1
{"batchWatermarkMs":0,"batchTimestampMs":$'timestamp',"conf":{"spark.sql.shuffle.partitions":"200"}}
{"Topic1WithPartiton1":{"0":$'OffsetforTopic1ForPartition0'},"Topic2WithPartiton2":{"1":$'OffsetforTopic2ForPartition1',"0":$'OffsetforTopic2ForPartition1'}}
Run Code Online (Sandbox Code Playgroud)

例如。

v1
{"batchWatermarkMs":0,"batchTimestampMs":1505718000115,"conf":{"spark.sql.shuffle.partitions":"200"}}
{"Topic1WithPartiton1":{"0":21482917},"Topic2WithPartiton2":{"1":103557997,"0":103547910}}
Run Code Online (Sandbox Code Playgroud)

因此,我认为为了监控偏移滞后,需要开发具有以下功能的自定义工具。

  • 从 HDFS 的偏移量中读取。
  • 将偏移量写入 Kafka __offset 主题。

这样,现有的偏移滞后监控工具可以监控结构化流媒体的 kafka 偏移滞后。