kka*_*k12 10 apache-spark databricks spark-structured-streaming
使用表流,我尝试使用 foreachBatch 写入流
df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
...
Run Code Online (Sandbox Code Playgroud)
WriteStreamToDelta 看起来像
def WriteStreamToDelta(microDF, batch_id):
microDFWrangled = microDF."some_transformations"
print(microDFWrangled.count()) <-- How do I achieve the equivalence of this?
microDFWrangled.writeStream...
Run Code Online (Sandbox Code Playgroud)
我想查看其中的行数
在每个微批次中运行microDFWrangled.count()将是一件有点昂贵的事情。我相信更有效的是涉及StreamingQueryListener可以将输出发送到控制台、驱动程序日志、外部数据库等。
StreamingQueryListener 非常高效,因为它使用内部流统计信息,因此无需运行额外的计算来获取记录计数。
但是,对于 PySpark,此功能可在从 11.0 开始的 Databricks 中使用。在 OSS Spark 中,我认为它仅在最新版本后才可用
参考: https: //www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
如果您仍然想使用 print() 发送输出,请考虑添加.awaitTermination()为最后一个链接语句:
df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
.Start()
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
878 次 |
| 最近记录: |