a.p*_*ell 3 apache-spark pyspark databricks spark-structured-streaming
目前,我在一个单元格中使用 Spark 数据帧(自动加载器)时遇到一些问题,可能需要一些时间才能写入数据。然后,在下面的单元格中,代码引用第一个表完成的工作。但是,如果由于 Spark 的分布式特性而运行整个笔记本(特别是作为作业),则第二个单元会在第一个单元完全完成之前运行。如何让第二个单元等待 writeStream 完成,而不将它们放在单独的笔记本中。
例子:
小区1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
Run Code Online (Sandbox Code Playgroud)
细胞2
df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud)
您需要使用awaitTermination函数来等待流处理完成(请参阅文档)。像这样:
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
autoload.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud)
尽管它可以更容易阅读,也更难犯这样的错误:
df = spark.read.table('TABLE1').count()
Run Code Online (Sandbox Code Playgroud)
更新:等待多个流:
while len(spark.streams.active) > 0:
spark.streams.resetTerminated() # Otherwise awaitAnyTermination() will return immediately after first stream has terminated
spark.streams.awaitAnyTermination()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3939 次 |
| 最近记录: |