Pow*_*ers 10 scala apache-spark spark-structured-streaming
有一个CSV文件的数据湖,全天更新.我正在尝试使用此博客文章中概述的Trigger.Once功能创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据.
这就是我所拥有的:
val df = spark
.readStream
.schema(s)
.csv("s3a://csv-data-lake-files")
Run Code Online (Sandbox Code Playgroud)
以下命令将所有数据写入Parquet湖,但在写完所有数据后没有停止(我必须手动取消作业).
processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
Run Code Online (Sandbox Code Playgroud)
以下工作也有效,但在写完所有数据后都没有停止(我不得不手动取消工作):
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
以下命令在写入任何数据之前停止查询.
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.stop()
Run Code Online (Sandbox Code Playgroud)
如何配置writeStream查询以等待所有增量数据写入Parquet文件然后停止?
我得到了 Structured Streaming + Trigger.Once 以便在 Parquet 数据湖上正常工作。
我认为它不能与 CSV 数据湖一起使用,因为 CSV 数据湖在嵌套目录中有大量小文件。Spark 不喜欢使用小 CSV 文件(我认为它需要打开所有文件才能读取标头),并且非常讨厌需要全局 S3 目录。
所以我认为 Spark Structured Streaming + Trigger.Once 代码很好 - 他们只需要让 CSV 阅读器技术更好。
结构化流的主要目的是连续处理数据,而无需在新数据到达时启动/停止流。阅读本文了解更多详情。
从 Spark 2.0.0 开始,StreamingQuery有方法processAllAvailable等待所有源数据被处理并提交到接收器。请注意,scala 文档声明此方法仅用于测试目的。
因此,代码应该如下所示(如果您仍然想要它):
query.processAllAvailable
query.stop
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2354 次 |
| 最近记录: |