Jupyter Notebook 中的 Spark Streaming waitTermination

Kee*_*pan 3 apache-spark spark-streaming pyspark

我正在遵循 Apache Spark Definitive Guide 中的代码。我遇到了一个问题,当我有注释的代码行“awaitTermination()”时,以下代码不会在 Jupyter Notebook 中打印结果。由于代码中包含“awaitTermination()”,Jupyter 内核很忙,并且可能会无限期地长时间保持忙碌状态。

如果没有“awaitTermination”,代码可以正常工作。

有人可以解释这种行为吗?我怎样才能克服这个问题?

static = spark.read.json(r"/resources/activity-data/")
dataSchema = static.schema
streaming = (spark
             .readStream
             .schema(dataSchema)
             .option("maxFilesPerTrigger", 1)
             .json(r"/resources/activity-data/")
            )
activityCounts = streaming.groupBy("gt").count()
spark.conf.set("spark.sql.shuffle.partitions", 5)
activityQuery = (activityCounts
                 .writeStream
                 .queryName("activity_counts")
                 .format("memory")
                 .outputMode("complete")
                 .start()
                )
#activityQuery.awaitTermination()
#activityQuery.stop()
from time import sleep
for x in range(5):
    spark.table("activity_counts").show()
    sleep(1)
Run Code Online (Sandbox Code Playgroud)

the*_*hon 5

是的; 请参阅此文档作为参考(https://docs.databricks.com/spark/latest/structed-streaming/Production.html),Spark TDG 中的第 352 页也对此进行了解释。

Spark Streaming 作业是连续的应用程序,在生产中activityQuery.awaitTermination()是必需的,因为它可以防止驱动程序进程在流处于活动状态(在后台)时终止。

如果驱动程序被杀死,那么应用程序也会被杀死,因此activityQuery.awaitTermination()有点像自动防故障装置。如果您想关闭 Jupyter 中的流,您可以运行activityQuery.stop()以重置查询以进行测试......我希望这会有所帮助。

activityDataSample = 'path/to/data'
spark.conf.set("spark.sql.shuffle.partitions", 8)
static = spark.read.json(activityDataSample)
dataSchema = static.schema
static.printSchema()

streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.json(activityDataSample)

activityCounts = streaming.groupBy("gt").count()

activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("memory").outputMode("complete")\
.start()

# simulates a continuous stream for testing (cntrl-C to kill app)
'''
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("console").outputMode("complete")\
.start()
activityQuery.awaitTermination()
'''

spark.streams.active # query stream is active
[<pyspark.sql.streaming.StreamingQuery at 0x28a4308d320>]

from time import sleep
for x in range(3):
    spark.sql("select * from activity_counts").show(3)
    sleep(2)
+---+-----+
| gt|count|
+---+-----+
+---+-----+

+--------+-----+
|      gt|count|
+--------+-----+
|    bike|10796|
|    null|10449|
|stairsup|10452|
+--------+-----+
only showing top 3 rows

+--------+-----+
|      gt|count|
+--------+-----+
|    bike|10796|
|    null|10449|
|stairsup|10452|
+--------+-----+
only showing top 3 rows

activityQuery.stop() # stop query stream
spark.streams.active # no active streams anymore
[]
Run Code Online (Sandbox Code Playgroud)