Mat*_*ler 9 python apache-spark pyspark databricks spark-structured-streaming
我正在尝试使用 Spark 结构化流处理的功能“触发一次”来模拟批量类似的设置。但是,当我运行初始批次时,我遇到了一些麻烦,因为我有很多历史数据,因此我还使用选项 .option ("cloudFiles.includeExistingFiles", "true")来处理现有文件。
因此,我的初始批次变得非常大,因为我无法控制该批次的文件数量。
我还尝试使用选项cloudFiles.maxBytesPerTrigger,但是,当您使用 Trigger 一次时,这会被忽略 --> https://docs.databricks.com/spark/latest/structed-streaming/auto-loader-gen2.html
当我指定maxFilesPerTrigger选项时,它也会被忽略。它只需要所有可用的文件。
我的代码如下所示:
df = (
spark.readStream.format("cloudFiles")
.schema(schemaAsStruct)
.option("cloudFiles.format", sourceFormat)
.option("delimiter", delimiter)
.option("header", sourceFirstRowIsHeader)
.option("cloudFiles.useNotifications", "true")
.option("cloudFiles.includeExistingFiles", "true")
.option("badRecordsPath", badRecordsPath)
.option("maxFilesPerTrigger", 1)
.option("cloudFiles.resourceGroup", omitted)
.option("cloudFiles.region", omitted)
.option("cloudFiles.connectionString", omitted)
.option("cloudFiles.subscriptionId", omitted)
.option("cloudFiles.tenantId", omitted)
.option("cloudFiles.clientId", omitted)
.option("cloudFiles.clientSecret", omitted)
.load(sourceBasePath)
)
# Traceability columns
df = (
df.withColumn(sourceFilenameColumnName, input_file_name())
.withColumn(processedTimestampColumnName, lit(processedTimestamp))
.withColumn(batchIdColumnName, lit(batchId))
)
def process_batch(batchDF, id):
batchDF.persist()
(batchDF
.write
.format(destinationFormat)
.mode("append")
.save(destinationBasePath + processedTimestampColumnName + "=" + processedTimestamp)
)
(batchDF
.groupBy(sourceFilenameColumnName, processedTimestampColumnName)
.count()
.write
.format(destinationFormat)
.mode("append")
.save(batchSourceFilenamesTmpDir))
batchDF.unpersist()
stream = (
df.writeStream
.foreachBatch(process_batch)
.trigger(once=True)
.option("checkpointLocation", checkpointPath)
.start()
)
Run Code Online (Sandbox Code Playgroud)
如您所见,我使用的是cloudfiles格式,这是 Databricks Autoloader 的格式 --> https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-gen2.html
“当新数据文件到达云存储时,自动加载程序会增量且高效地处理新数据文件。
Auto Loader 提供了一个名为 cloudFiles 的结构化流源。给定云文件存储上的输入目录路径,cloudFiles 源会在新文件到达时自动处理新文件,并且可以选择处理该目录中的现有文件”
如果我以某种令人困惑的方式提出我的问题或者缺乏信息,请说出来。
不幸的是,Spark 3.x (DBR >= 7.x) 完全忽略了诸如 等maxFilesPerTrigger限制拉取处理的数据量的选项 - 在这种情况下,它将尝试一次性处理所有数据,有时可能会导致性能问题。
要解决此问题,您可以执行以下 hack 定期检查 的值stream.get('numInputRows'),如果它在一段时间内等于 0,请发出stream.stop()
2021 年 10 月更新:看起来它将通过引入新的触发器类型在 Spark 3.3 中修复 - Trigger.AvailableNow(请参阅SPARK-36533)
| 归档时间: |
|
| 查看次数: |
6916 次 |
| 最近记录: |