使用Spark Structured Streaming和Trigger.Once

Pow*_*ers 10 scala apache-spark spark-structured-streaming

有一个CSV文件的数据湖,全天更新.我正在尝试使用此博客文章中概述Trigger.Once功能创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据.

这就是我所拥有的:

val df = spark
  .readStream
  .schema(s)
  .csv("s3a://csv-data-lake-files")
Run Code Online (Sandbox Code Playgroud)

以下命令将所有数据写入Parquet湖,但在写完所有数据后没有停止(我必须手动取消作业).

processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")
Run Code Online (Sandbox Code Playgroud)

以下工作也有效,但在写完所有数据后都没有停止(我不得不手动取消工作):

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

以下命令在写入任何数据之前停止查询.

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.stop()
Run Code Online (Sandbox Code Playgroud)

如何配置writeStream查询以等待所有增量数据写入Parquet文件然后停止?

Pow*_*ers 5

我得到了 Structured Streaming + Trigger.Once 以便在 Parquet 数据湖上正常工作。

我认为它不能与 CSV 数据湖一起使用,因为 CSV 数据湖在嵌套目录中有大量小文件。Spark 不喜欢使用小 CSV 文件(我认为它需要打开所有文件才能读取标头),并且非常讨厌需要全局 S3 目录。

所以我认为 Spark Structured Streaming + Trigger.Once 代码很好 - 他们只需要让 CSV 阅读器技术更好。


Yur*_*ruk 2

结构化流的主要目的是连续处理数据,而无需在新数据到达时启动/停止流。阅读本文了解更多详情。

从 Spark 2.0.0 开始,StreamingQuery有方法processAllAvailable等待所有源数据被处理并提交到接收器。请注意,scala 文档声明此方法仅用于测试目的

因此,代码应该如下所示(如果您仍然想要它):

query.processAllAvailable
query.stop
Run Code Online (Sandbox Code Playgroud)

  • 这是有目的的。当你想省钱,因为闲置集群就是在浪费钱,那么触发 Once 就变得非常有价值 (4认同)