小编Mat*_*ler的帖子

如何使用“触发一次”触发器控制 Spark 结构化流中每个触发器处理的文件量?

我正在尝试使用 Spark 结构化流处理的功能“触发一次”来模拟批量类似的设置。但是,当我运行初始批次时,我遇到了一些麻烦,因为我有很多历史数据,因此我还使用选项 .option ("cloudFiles.includeExistingFiles", "true")来处理现有文件。

因此,我的初始批次变得非常大,因为我无法控制该批次的文件数量。

我还尝试使用选项cloudFiles.maxBytesPerTrigger,但是,当您使用 Trigger 一次时,这会被忽略 --> https://docs.databricks.com/spark/latest/structed-streaming/auto-loader-gen2.html

当我指定maxFilesPerTrigger选项时,它也会被忽略。它只需要所有可用的文件。

我的代码如下所示:

df = (
  spark.readStream.format("cloudFiles")
    .schema(schemaAsStruct)
    .option("cloudFiles.format", sourceFormat)
    .option("delimiter", delimiter)
    .option("header", sourceFirstRowIsHeader)
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.includeExistingFiles", "true")
    .option("badRecordsPath", badRecordsPath)
    .option("maxFilesPerTrigger", 1)
    .option("cloudFiles.resourceGroup", omitted)
    .option("cloudFiles.region", omitted)
    .option("cloudFiles.connectionString", omitted)
    .option("cloudFiles.subscriptionId", omitted)
    .option("cloudFiles.tenantId", omitted)
    .option("cloudFiles.clientId", omitted)
    .option("cloudFiles.clientSecret", omitted)
    .load(sourceBasePath)
)

# Traceability columns
df = (
  df.withColumn(sourceFilenameColumnName, input_file_name()) 
    .withColumn(processedTimestampColumnName, lit(processedTimestamp))
    .withColumn(batchIdColumnName, lit(batchId))
)

def process_batch(batchDF, id):
  batchDF.persist()
  
  (batchDF
     .write
     .format(destinationFormat)
     .mode("append")
     .save(destinationBasePath + processedTimestampColumnName + "=" …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark databricks spark-structured-streaming

9
推荐指数
1
解决办法
6916
查看次数