Jupyter Notebook 上未显示结构化流输出

Abd*_*eeb 5 apache-spark spark-streaming pyspark jupyter-notebook spark-structured-streaming

我有两本笔记本。第一个笔记本正在使用 tweepy 从 twitter 读取推文并将其写入套接字。其他笔记本正在使用 spark 结构化流 (Python) 从该套接字读取推文并将其结果写入控制台。不幸的是,我没有在 jupyter 控制台上获得输出。代码在 pycharm 上运行良好。

spark = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# This is Spark Structured Streaming Code which is reading streams from twitter and showing them on console.
tweets = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 7000) \
    .load()

query = tweets \
    .writeStream \
    .option("truncate", "false") \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

Dom*_*iak 8

我不确定 Jupyter Notebook 是否可以实现这一点。但是,您可以使用内存输出来实现类似的结果。这在模式中很简单complete,但可能需要对append.

对于complete模式

complete输出模式下,您的查询应该大致如下所示:

query = tweets \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("your_query_name") \
    .start()
Run Code Online (Sandbox Code Playgroud)

query.awaitTermination()注意最后没有。现在,查询your_query_name另一个单元格中的临时表,并根据需要观察不断更新的结果:

from IPython.display import display, clear_output

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM your_query_name').show())
    sleep(1)
Run Code Online (Sandbox Code Playgroud)

对于append模式

如果您想使用append输出模式,则必须使用水印。您也将无法使用聚合,因此您的代码可能需要进行一些进一步的更改。

query = tweets \
    .withWatermark("timestampColumn", "3 minutes")
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("your_query_name") \
    .start()
Run Code Online (Sandbox Code Playgroud)

显示代码保持不变。您还可以query.lastProgress以类似的方式显示更详细的信息。

灵感和参考