如何在spark结构化流媒体中手动提交kafka偏移量?

use*_*499 7 apache-kafka apache-spark spark-structured-streaming

卡夫卡集成指南-我正在经历星火结构化数据流在这里.

在这个链接上被告知

enable.auto.commit:Kafka源不提交任何偏移量.

那么一旦我的spark应用程序成功处理了每条记录,我该如何手动提交偏移?

mik*_*ike 8

tl;博士

无法向 Kafka 提交任何消息。从 Spark 3.x 版本开始,您可以定义 Kafka 消费者组的名称,但是,这仍然不允许您提交任何消息。


从 Spark 3.0.0

根据结构化 Kafka 集成指南,您可以提供 ConsumerGroup 作为选项kafka.group.id

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "myConsumerGroup")
  .load()
Run Code Online (Sandbox Code Playgroud)

但是,Spark 仍然不会提交任何偏移量,因此您将无法“手动”向 Kafka 提交偏移量。此功能旨在使用基于角色的访问控制处理 Kafka 的最新功能授权,您的 ConsumerGroup 通常需要遵循命名约定。

此处讨论并解决了 Spark 3.x 应用程序的完整示例。

直到 Spark 2.4.x

Spark Structured Streaming + Kafka 集成指南清楚地说明了它如何管理 Kafka 偏移量。Spark不会将任何消息提交回 Kafka,因为它依靠内部偏移管理来实现容错。

用于管理偏移的最重要的 Kafka 配置是:

  • group.id:Kafka源会自动为每个查询创建一个唯一的组 ID。根据代码group.id 将被设置为
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
Run Code Online (Sandbox Code Playgroud)
  • auto.offset.reset:设置源选项startingOffsets 以指定从哪里开始。 Structured Streaming 管理内部消耗哪些偏移量,而不是依赖 kafka Consumer 来完成。
  • enable.auto.commit: Kafka 源不提交任何偏移量。

因此,在 Structured Streaming 中,目前无法为 Kafka Consumer 定义您的自定义 group.id,并且 Structured Streaming 在内部管理偏移量而不是提交回 Kafka(也不会自动提交)。

2.4.x 在行动

假设您有一个简单的 Spark Structured Streaming 应用程序,可以读取和写入 Kafka,如下所示:

// create SparkSession
val spark = SparkSession.builder()
  .appName("ListenerTester")
  .master("local[*]")
  .getOrCreate()

// read from Kafka topic
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "testingKafkaProducer")
  .option("failOnDataLoss", "false")
  .load()

// write to Kafka topic and set checkpoint directory for this stream
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "testingKafkaProducerOut")
  .option("checkpointLocation", "/home/.../sparkCheckpoint/")
  .start()
Run Code Online (Sandbox Code Playgroud)

Spark 偏移管理

一旦提交了这个应用程序并且正在处理数据,就可以在检查点目录中找到相应的偏移量:

myCheckpointDir/偏移量/

{"testingKafkaProducer":{"0":1}}
Run Code Online (Sandbox Code Playgroud)

这里检查点文件中的条目确认0要消耗的分区的下一个偏移量是1. 这意味着应用程序已经处理了名为 的主题0分区0的偏移量testingKafkaProducer

Spark文档中提供了有关容错语义的更多信息

卡夫卡的抵消管理

但是,如文档中所述,偏移量不会提交回 Kafka。这可以通过执行kafka-consumer-groups.shKafka 安装来检查。

./kafka/current/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group "spark-kafka-source-92ea6f85-[...]-driver-0"

TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID      HOST         CLIENT-ID
testingKafkaProducer 0          -               1               -    consumer-1-[...] /127.0.0.1   consumer-1
Run Code Online (Sandbox Code Playgroud)

Kafka不知道此应用程序的当前偏移量,因为它从未被提交。

可能的解决方法

请仔细阅读下面来自 Spark 提交者 @JungtaekLim 的关于解决方法的评论:“Spark 的容错保证基于 Spark 完全控制偏移管理的事实,如果他们试图修改它,他们就会使保证无效。 (例如,如果他们更改为向 Kafka 提交偏移量,则没有批次信息,并且如果 Spark 需要移回“后面”的特定批次,则保证不再有效。)”

我在网上看到的一些研究是,您可以onQueryProgress在自定义StreamingQueryListener的 Spark方法的回调函数中提交偏移量。这样,您就可以拥有一个跟踪当前进度的消费者组。然而,它的进展并不一定与实际的消费群体一致。

以下是您可能会觉得有用的一些链接:

  • 我的荣幸。实际上,我以某种方式访问​​了这个,因为有人被这个答案误导,并认为我的项目是实际问题的解决方案(关于 Kafka 数据源上的偏移问题),但事实并非如此,也不可能如此。 (2认同)