xia*_*ing 2 apache-spark pyspark pyspark-sql spark-structured-streaming
如何foreach在 Python Spark 结构化流中使用以触发输出操作。
query = wordCounts\
.writeStream\
.outputMode('update')\
.foreach(func)\
.start()
def func():
ops(wordCounts)
Run Code Online (Sandbox Code Playgroud)
小智 5
Spark 2.4.0 中添加了对 Python 中 foreach 接收器的支持,并且文档已更新:http : //spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-和-foreachbatch
确保您拥有该版本,您现在可以执行以下操作:
def process_row(row):
# Process row
pass
query = streamingDF.writeStream.foreach(process_row).start()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3834 次 |
| 最近记录: |