如何使用“触发一次”触发器控制 Spark 结构化流中每个触发器处理的文件量?

Mat*_*ler 9 python apache-spark pyspark databricks spark-structured-streaming

我正在尝试使用 Spark 结构化流处理的功能“触发一次”来模拟批量类似的设置。但是,当我运行初始批次时,我遇到了一些麻烦,因为我有很多历史数据,因此我还使用选项 .option ("cloudFiles.includeExistingFiles", "true")来处理现有文件。

因此,我的初始批次变得非常大,因为我无法控制该批次的文件数量。

我还尝试使用选项cloudFiles.maxBytesPerTrigger,但是,当您使用 Trigger 一次时,这会被忽略 --> https://docs.databricks.com/spark/latest/structed-streaming/auto-loader-gen2.html

当我指定maxFilesPerTrigger选项时,它也会被忽略。它只需要所有可用的文件。

我的代码如下所示:

df = (
  spark.readStream.format("cloudFiles")
    .schema(schemaAsStruct)
    .option("cloudFiles.format", sourceFormat)
    .option("delimiter", delimiter)
    .option("header", sourceFirstRowIsHeader)
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.includeExistingFiles", "true")
    .option("badRecordsPath", badRecordsPath)
    .option("maxFilesPerTrigger", 1)
    .option("cloudFiles.resourceGroup", omitted)
    .option("cloudFiles.region", omitted)
    .option("cloudFiles.connectionString", omitted)
    .option("cloudFiles.subscriptionId", omitted)
    .option("cloudFiles.tenantId", omitted)
    .option("cloudFiles.clientId", omitted)
    .option("cloudFiles.clientSecret", omitted)
    .load(sourceBasePath)
)

# Traceability columns
df = (
  df.withColumn(sourceFilenameColumnName, input_file_name()) 
    .withColumn(processedTimestampColumnName, lit(processedTimestamp))
    .withColumn(batchIdColumnName, lit(batchId))
)

def process_batch(batchDF, id):
  batchDF.persist()
  
  (batchDF
     .write
     .format(destinationFormat)
     .mode("append")
     .save(destinationBasePath + processedTimestampColumnName + "=" +  processedTimestamp)
  )
    
  (batchDF
   .groupBy(sourceFilenameColumnName, processedTimestampColumnName)
   .count()
   .write
   .format(destinationFormat)
   .mode("append")
   .save(batchSourceFilenamesTmpDir))
  
  batchDF.unpersist()

stream = (
  df.writeStream
    .foreachBatch(process_batch)
    .trigger(once=True)
    .option("checkpointLocation", checkpointPath)
    .start()
)
Run Code Online (Sandbox Code Playgroud)

如您所见,我使用的是cloudfiles格式,这是 Databricks Autoloader 的格式 --> https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-gen2.html

当新数据文件到达云存储时,自动加载程序会增量且高效地处理新数据文件。

Auto Loader 提供了一个名为 cloudFiles 的结构化流源。给定云文件存储上的输入目录路径,cloudFiles 源会在新文件到达时自动处理新文件,并且可以选择处理该目录中的现有文件

如果我以某种令人困惑的方式提出我的问题或者缺乏信息,请说出来。

Ale*_*Ott 5

不幸的是,Spark 3.x (DBR >= 7.x) 完全忽略了诸如 等maxFilesPerTrigger限制拉取处理的数据量的选项 - 在这种情况下,它将尝试一次性处理所有数据,有时可能会导致性能问题。

要解决此问题,您可以执行以下 hack 定期检查 的值stream.get('numInputRows'),如果它在一段时间内等于 0,请发出stream.stop()

2021 年 10 月更新:看起来它将通过引入新的触发器类型在 Spark 3.3 中修复 - Trigger.AvailableNow(请参阅SPARK-36533