Spark 结构化流查询始终以 auto.offset.rest=earliest 开头,即使设置了 auto.offset.reset=latest

Kar*_*ddy 6 scala apache-spark kafka-consumer-api spark-structured-streaming

我在尝试使用 Spark 结构化流从 Kafka 读取数据时遇到了一个奇怪的问题。我的用例是能够从可用的最大/最新偏移量中读取主题。

我的读取配置:

val data = spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "some xyz server")
     .option("subscribe", "sampletopic")
     .option("auto.offset.reset", "latest")
     .option("startingOffsets", "latest")
     .option("kafkaConsumer.pollTimeoutMs", 20000)
     .option("failOnDataLoss","false")
     .option("maxOffsetsPerTrigger",20000)
     .load()
Run Code Online (Sandbox Code Playgroud)

我的写入配置:

data
    .writeStream
    .outputMode("append")
    .queryName("test") 
    .format("parquet")
    .option("checkpointLocation", "s3://somecheckpointdir")
    .start("s3://outpath").awaitTermination()
Run Code Online (Sandbox Code Playgroud)

使用的版本:

  • spark-core_2.11 : 2.2.1
  • spark-sql_2.11 : 2.2.1
  • spark-sql-kafka-0-10_2.11 : 2.2.1

我已经在网上和 [Kafka 文档]( https://kafka.apache.org/0100/documentation.html0/

我正在使用新的消费者 api,正如文档建议的那样,我只需将 auto.offset.reset 设置为“最新”或将startingOffsets设置为“最新”,以确保我的 Spark 作业开始从 Kafka 中每个分区可用的最新偏移量开始消耗。

我还知道,该设置auto.offset.reset仅在第一次启动新查询时启动,而不是在应用程序重新启动时启动,在这种情况下它将继续从上次保存的偏移量读取。

我正在使用 s3 来检查我的偏移量。我看到它们是在 s3://somecheckpointdir 下生成的。

我面临的问题是,Spark 作业始终从最早的偏移量读取,即使在应用程序首次启动时在代码中指定了最新选项,并且我在 Spark 日志中看到了这一点。 auto.offset.reset = earliest正在使用。我还没有看到与这个特定问题相关的帖子。

我想知道我是否在这里遗漏了一些东西,以及是否有人以前见过这种行为。任何帮助/指导确实会有用。谢谢。

zsx*_*ing 4

  1. 所有 Kafka 配置都应设置kafka.前缀。因此正确的选项键是kafka.auto.offset.reset.
  2. 你永远不应该设置auto.offset.reset. 相反,“设置源选项startingOffsets来指定从哪里开始。结构化流管理内部消费哪些偏移量,而不是依赖kafka Consumer来完成。这将确保动态订阅新主题/分区时不会丢失任何数据.请注意,startingOffsets仅在启动新的流式查询时适用,并且恢复将始终从查询停止的位置开始。” [1]

[1] http://spark.apache.org/docs/latest/structed-streaming-kafka-integration.html#kafka-specific-configurations