Kar*_*ddy 6 scala apache-spark kafka-consumer-api spark-structured-streaming
我在尝试使用 Spark 结构化流从 Kafka 读取数据时遇到了一个奇怪的问题。我的用例是能够从可用的最大/最新偏移量中读取主题。
我的读取配置:
val data = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "some xyz server")
.option("subscribe", "sampletopic")
.option("auto.offset.reset", "latest")
.option("startingOffsets", "latest")
.option("kafkaConsumer.pollTimeoutMs", 20000)
.option("failOnDataLoss","false")
.option("maxOffsetsPerTrigger",20000)
.load()
Run Code Online (Sandbox Code Playgroud)
我的写入配置:
data
.writeStream
.outputMode("append")
.queryName("test")
.format("parquet")
.option("checkpointLocation", "s3://somecheckpointdir")
.start("s3://outpath").awaitTermination()
Run Code Online (Sandbox Code Playgroud)
使用的版本:
spark-core_2.11 : 2.2.1spark-sql_2.11 : 2.2.1spark-sql-kafka-0-10_2.11 : 2.2.1我已经在网上和 [Kafka 文档]( https://kafka.apache.org/0100/documentation.html0/
我正在使用新的消费者 api,正如文档建议的那样,我只需将 auto.offset.reset 设置为“最新”或将startingOffsets设置为“最新”,以确保我的 Spark 作业开始从 Kafka 中每个分区可用的最新偏移量开始消耗。
我还知道,该设置auto.offset.reset仅在第一次启动新查询时启动,而不是在应用程序重新启动时启动,在这种情况下它将继续从上次保存的偏移量读取。
我正在使用 s3 来检查我的偏移量。我看到它们是在 s3://somecheckpointdir 下生成的。
我面临的问题是,Spark 作业始终从最早的偏移量读取,即使在应用程序首次启动时在代码中指定了最新选项,并且我在 Spark 日志中看到了这一点。
auto.offset.reset = earliest正在使用。我还没有看到与这个特定问题相关的帖子。
我想知道我是否在这里遗漏了一些东西,以及是否有人以前见过这种行为。任何帮助/指导确实会有用。谢谢。
kafka.前缀。因此正确的选项键是kafka.auto.offset.reset.auto.offset.reset. 相反,“设置源选项startingOffsets来指定从哪里开始。结构化流管理内部消费哪些偏移量,而不是依赖kafka Consumer来完成。这将确保动态订阅新主题/分区时不会丢失任何数据.请注意,startingOffsets仅在启动新的流式查询时适用,并且恢复将始终从查询停止的位置开始。” [1]| 归档时间: |
|
| 查看次数: |
11091 次 |
| 最近记录: |