如何在pyspark中使用foreach接收器?

xia*_*ing 2 apache-spark pyspark pyspark-sql spark-structured-streaming

如何foreach在 Python Spark 结构化流中使用以触发输出操作。

query = wordCounts\
    .writeStream\
    .outputMode('update')\
    .foreach(func)\
    .start()

def func():
    ops(wordCounts)
Run Code Online (Sandbox Code Playgroud)

小智 5

Spark 2.4.0 中添加了对 Python 中 foreach 接收器的支持,并且文档已更新:http : //spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-和-foreachbatch

确保您拥有该版本,您现在可以执行以下操作:

def process_row(row):
    # Process row
    pass

query = streamingDF.writeStream.foreach(process_row).start()  
Run Code Online (Sandbox Code Playgroud)