jam*_*ann 8 apache-spark spark-structured-streaming
我遇到了一个 Spark 结构化流 (SSS) 应用程序的问题,该应用程序由于程序错误而崩溃,并且在周末没有处理。当我重新启动它时,有许多关于要重新处理的主题的消息(需要加入的 3 个主题各有 250'000 条消息)。
重新启动时,应用程序再次崩溃并出现 OutOfMemory 异常。我从文档中了解到,maxOffsetsPerTrigger在这些情况下,读取流上的配置应该会有所帮助。我更改了 PySpark 代码(在 SSS 2.4.3 上运行),所有 3 个主题都具有以下内容
rawstream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("maxOffsetsPerTrigger", 10000L)
.option("startingOffsets", "earliest")
.load()
Run Code Online (Sandbox Code Playgroud)
我的期望是现在 SSS 查询将从每个主题加载 ~33'000 个偏移量并将它们加入第一批。然后在第二批中,它将清除第一批中的状态记录,由于水印而到期(这将清除第一批中的大部分记录),然后从每个主题中再读取约 33k。因此,在大约 8 个批次之后,它应该已经处理了延迟,并具有“合理”的内存量。
但是应用程序仍然因 OOM 而崩溃,当我检查应用程序主 UI 中的 DAG 时,它报告它再次尝试读取所有 250'000 条消息。
还有什么我需要配置的吗?我如何检查这个选项是否真的被使用?(当我检查计划时,不幸的是它被截断了,只是显示(Options: [includeTimestamp=true,subscribe=IN2,inferSchema=true,failOnDataLoss=false,kafka.b...),我不知道如何在点之后显示部分)
| 归档时间: |
|
| 查看次数: |
630 次 |
| 最近记录: |