Spark 结构化流 maxOffsetsPerTrigger 似乎不起作用

jam*_*ann 8 apache-spark spark-structured-streaming

我遇到了一个 Spark 结构化流 (SSS) 应用程序的问题,该应用程序由于程序错误而崩溃,并且在周末没有处理。当我重新启动它时,有许多关于要重新处理的主题的消息(需要加入的 3 个主题各有 250'000 条消息)。

重新启动时,应用程序再次崩溃并出现 OutOfMemory 异常。我从文档中了解到,maxOffsetsPerTrigger在这些情况下,读取流上的配置应该会有所帮助。我更改了 PySpark 代码(在 SSS 2.4.3 上运行),所有 3 个主题都具有以下内容

 rawstream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topicName)
    .option("maxOffsetsPerTrigger", 10000L)
    .option("startingOffsets", "earliest")
    .load()
Run Code Online (Sandbox Code Playgroud)

我的期望是现在 SSS 查询将从每个主题加载 ~33'000 个偏移量并将它们加入第一批。然后在第二批中,它将清除第一批中的状态记录,由于水印而到期(这将清除第一批中的大部分记录),然后从每个主题中再读取约 33k。因此,在大约 8 个批次之后,它应该已经处理了延迟,并具有“合理”的内存量。

但是应用程序仍然因 OOM 而崩溃,当我检查应用程序主 UI 中的 DAG 时,它报告它再次尝试读取所有 250'000 条消息。

还有什么我需要配置的吗?我如何检查这个选项是否真的被使用?(当我检查计划时,不幸的是它被截断了,只是显示(Options: [includeTimestamp=true,subscribe=IN2,inferSchema=true,failOnDataLoss=false,kafka.b...),我不知道如何在点之后显示部分)

小智 0

您必须删除检查点目录才能刷新新配置。