我正在尝试使用 Spark 结构化流处理的功能“触发一次”来模拟批量类似的设置。但是,当我运行初始批次时,我遇到了一些麻烦,因为我有很多历史数据,因此我还使用选项 .option ("cloudFiles.includeExistingFiles", "true")来处理现有文件。
因此,我的初始批次变得非常大,因为我无法控制该批次的文件数量。
我还尝试使用选项cloudFiles.maxBytesPerTrigger,但是,当您使用 Trigger 一次时,这会被忽略 --> https://docs.databricks.com/spark/latest/structed-streaming/auto-loader-gen2.html
当我指定maxFilesPerTrigger选项时,它也会被忽略。它只需要所有可用的文件。
我的代码如下所示:
df = (
spark.readStream.format("cloudFiles")
.schema(schemaAsStruct)
.option("cloudFiles.format", sourceFormat)
.option("delimiter", delimiter)
.option("header", sourceFirstRowIsHeader)
.option("cloudFiles.useNotifications", "true")
.option("cloudFiles.includeExistingFiles", "true")
.option("badRecordsPath", badRecordsPath)
.option("maxFilesPerTrigger", 1)
.option("cloudFiles.resourceGroup", omitted)
.option("cloudFiles.region", omitted)
.option("cloudFiles.connectionString", omitted)
.option("cloudFiles.subscriptionId", omitted)
.option("cloudFiles.tenantId", omitted)
.option("cloudFiles.clientId", omitted)
.option("cloudFiles.clientSecret", omitted)
.load(sourceBasePath)
)
# Traceability columns
df = (
df.withColumn(sourceFilenameColumnName, input_file_name())
.withColumn(processedTimestampColumnName, lit(processedTimestamp))
.withColumn(batchIdColumnName, lit(batchId))
)
def process_batch(batchDF, id):
batchDF.persist()
(batchDF
.write
.format(destinationFormat)
.mode("append")
.save(destinationBasePath + processedTimestampColumnName + "=" …Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark databricks spark-structured-streaming