Leo*_*ima 5 python apache-spark pyspark databricks azure-databricks
我正在尝试使用 Autoloader 加载多种类型的 csv 文件,它当前将我放入一个大镶木地板表中的所有 csv 合并,我想要的是为每种类型的 schema/csv_file 创建镶木地板表
#Streaming files/ waiting a file to be dropped
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("delimiter", "~|~") \
.option("cloudFiles.inferColumnTypes","true") \
.option("cloudFiles.schemaLocation", pathCheckpoint) \
.load(sourcePath) \
.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", pathCheckpoint) \
.start(pathResult)
Run Code Online (Sandbox Code Playgroud)
小智 2
您可以为同一源目录中的每个文件创建不同的自动加载器流,并使用自动加载器上的pathGlobFilter选项过滤要使用的文件名(databricks 文档)。通过这样做,您最终将为每个表设置不同的检查点,并且可以使不同的模式正常工作。
此解决方案类似于此处指出的解决方案如何过滤 Databricks Autoloader 流中的文件。
| 归档时间: |
|
| 查看次数: |
814 次 |
| 最近记录: |