PySpark 等待笔记本中完成 (Databricks)

a.p*_*ell 3 apache-spark pyspark databricks spark-structured-streaming

目前,我在一个单元格中使用 Spark 数据帧(自动加载器)时遇到一些问题,可能需要一些时间才能写入数据。然后,在下面的单元格中,代码引用第一个表完成的工作。但是,如果由于 Spark 的分布式特性而运行整个笔记本(特别是作为作业),则第二个单元会在第一个单元完全完成之前运行。如何让第二个单元等待 writeStream 完成,而不将它们放在单独的笔记本中。

例子:

小区1

autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
Run Code Online (Sandbox Code Playgroud)

细胞2

df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud)

Ale*_*Ott 5

您需要使用awaitTermination函数来等待流处理完成(请参阅文档)。像这样:

  • 细胞1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
autoload.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
  • 细胞2
df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud)

尽管它可以更容易阅读,也更难犯这样的错误:

df = spark.read.table('TABLE1').count()
Run Code Online (Sandbox Code Playgroud)

更新:等待多个流:

while len(spark.streams.active) > 0:
  spark.streams.resetTerminated() # Otherwise awaitAnyTermination() will return immediately after first stream has terminated
  spark.streams.awaitAnyTermination()
Run Code Online (Sandbox Code Playgroud)