如何在 foreachBatch 函数中打印/记录输出?

kka*_*k12 10 apache-spark databricks spark-structured-streaming

使用表流,我尝试使用 foreachBatch 写入流

df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
...
Run Code Online (Sandbox Code Playgroud)

WriteStreamToDelta 看起来像

def WriteStreamToDelta(microDF, batch_id):
   microDFWrangled = microDF."some_transformations"
   
   print(microDFWrangled.count()) <-- How do I achieve the equivalence of this?

   microDFWrangled.writeStream...
Run Code Online (Sandbox Code Playgroud)

我想查看其中的行数

  1. 笔记本,位于 writeStream 单元下方
  2. 驾驶员日志
  3. 创建一个列表以附加每个微批次的行数。

Ale*_*lok 1

在每个微批次中运行microDFWrangled.count()将是一件有点昂贵的事情。我相信更有效的是涉及StreamingQueryListener可以将输出发送到控制台、驱动程序日志、外部数据库等。

StreamingQueryListener 非常高效,因为它使用内部流统计信息,因此无需运行额外的计算来获取记录计数。

但是,对于 PySpark,此功能可在从 11.0 开始的 Databricks 中使用。在 OSS Spark 中,我认为它仅在最新版本后才可用

参考: https: //www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html


如果您仍然想使用 print() 发送输出,请考虑添加.awaitTermination()为最后一个链接语句:

df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
.Start()
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)