使用 Databricks Auto Loader 摄取多种类型的 CSV

Leo*_*ima 5 python apache-spark pyspark databricks azure-databricks

我正在尝试使用 Autoloader 加载多种类型的 csv 文件,它当前将我放入一个大镶木地板表中的所有 csv 合并,我想要的是为每种类型的 schema/csv_file 创建镶木地板表

当前代码的作用是: 我目前拥有的

#Streaming files/ waiting a file to be dropped
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("delimiter", "~|~") \
  .option("cloudFiles.inferColumnTypes","true") \
  .option("cloudFiles.schemaLocation", pathCheckpoint) \
  .load(sourcePath) \
  .writeStream \
  .format("delta") \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", pathCheckpoint) \
  .start(pathResult)
Run Code Online (Sandbox Code Playgroud)

我想要的是在此输入图像描述

小智 2

您可以为同一源目录中的每个文件创建不同的自动加载器流,并使用自动加载器上的pathGlobFilter选项过滤要使用的文件名(databricks 文档)。通过这样做,您最终将为每个表设置不同的检查点,并且可以使不同的模式正常工作。

此解决方案类似于此处指出的解决方案如何过滤 Databricks Autoloader 流中的文件