如何在Zeppelin中获得控制台流水槽的输出?

m01*_*m01 9 apache-spark pyspark apache-zeppelin spark-structured-streaming

从Zeppelin运行时,我正在努力让接收console器使用PySpark Structured Streaming.基本上,我没有看到任何结果打印到屏幕或我发现的任何日志文件.

我的问题:有没有人有一个使用PySpark Structured Streaming和一个产生Apache Zeppelin可见输出的接收器的工作示例?理想情况下它也会使用套接字源,因为它很容易测试.

我正在使用:

  • Ubuntu 16.04
  • 火花2.2.0彬hadoop2.7
  • 齐柏林0.7.3彬所有
  • Python3

我的代码基于structured_network_wordcount.py示例.它从PySpark shell(./bin/pyspark --master local[2])运行时起作用; 我看到每批表.

%pyspark
# structured streaming
from pyspark.sql.functions import *
lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 9999)\
    .option('includeTimestamp', 'true')\
    .load()

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, '10 seconds', '1 seconds'),
    words.word
).count().orderBy('window')

# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .option('truncate', 'false')\
    .start()

print("Starting...")
query.awaitTermination(20)
Run Code Online (Sandbox Code Playgroud)

我希望看到每个批次的结果打印输出,但我只是看到Starting...,然后False,返回值query.awaitTermination(20).

在一个单独的终端中,我nc -lk 9999在上面运行时将一些数据输入到netcat会话中.

use*_*411 12

控制台接收器不适合基于交互式笔记本的工作流程.即使在可以捕获输出的Scala中,它也需要awaitTermination在同一段中调用(或等效),从而有效地阻止了音符.

%spark

spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", "9999")
  .option("includeTimestamp", "true")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("truncate", "false")
  .start()
  .awaitTermination() // Block execution, to force Zeppelin to capture the output
Run Code Online (Sandbox Code Playgroud)

链接awaitTermination可以替换为同一段中的独立调用也可以工作:

%spark

val query = df
  .writeStream
  ...
  .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

没有它,Zeppelin没有理由等待任何输出.PySpark只是增加了另一个问题 - 间接执行.因此,即使阻止查询也无法帮助您.

此外,来自流的连续输出在浏览笔记时可能导致渲染问题和内存问题(可能可以使用Zeppelin显示系统InterpreterContext或REST API,以实现更明智的行为,其中输出被覆盖或定期清除).

使用Zeppelin进行测试的更好选择是内存接收器.这样您就可以在不阻塞的情况下启动查询:

%pyspark

query = (windowedCounts
  .writeStream
  .outputMode("complete")
  .format("memory")
  .queryName("some_name")
  .start())
Run Code Online (Sandbox Code Playgroud)

并在另一段中按需查询结果:

%pyspark

spark.table("some_name").show()
Run Code Online (Sandbox Code Playgroud)

它可以与反应流或类似解决方案耦合,以提供基于间隔的更新.

虽然PySpark不支持查询监听器,并且需要一些代码来将事物粘合在一起,但也可以使用StreamingQueryListenerPy4j回调来rxonQueryProgress事件耦合.Scala界面:

package com.example.spark.observer

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

trait PythonObserver {
  def on_next(o: Object): Unit
}

class PythonStreamingQueryListener(observer: PythonObserver) 
    extends StreamingQueryListener {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    observer.on_next(event)
  }
  override def onQueryStarted(event: QueryStartedEvent): Unit = {}
  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Run Code Online (Sandbox Code Playgroud)

构建一个jar,调整构建定义以反映所需的Scala和Spark版本:

scalaVersion := "2.11.8"  

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion
)
Run Code Online (Sandbox Code Playgroud)

把它放在Spark类路径上,补丁StreamingQueryManager:

%pyspark

from pyspark.sql.streaming import StreamingQueryManager
from pyspark import SparkContext

def addListener(self, listener):
    jvm = SparkContext._active_spark_context._jvm
    jlistener = jvm.com.example.spark.observer.PythonStreamingQueryListener(
        listener
    )
    self._jsqm.addListener(jlistener)
    return jlistener


StreamingQueryManager.addListener = addListener
Run Code Online (Sandbox Code Playgroud)

启动回调服务器:

%pyspark

sc._gateway.start_callback_server()
Run Code Online (Sandbox Code Playgroud)

并添加监听器:

%pyspark

from rx.subjects import Subject

class StreamingObserver(Subject):
    class Java:
        implements = ["com.example.spark.observer.PythonObserver"]

observer = StreamingObserver()
spark.streams.addListener(observer)
Run Code Online (Sandbox Code Playgroud)

最后,您可以使用subscribe和阻止执行:

%pyspark

(observer
    .map(lambda p: p.progress().name())
    # .filter() can be used to print only for a specific query
    .subscribe(lambda n: spark.table(n).show() if n else None))
input()  # Block execution to capture the output 
Run Code Online (Sandbox Code Playgroud)

在开始流式查询后,应执行最后一步.

也可以跳过rx并使用这样的最小观察者:

class StreamingObserver(object):
    class Java:
        implements = ["com.example.spark.observer.PythonObserver"]

    def on_next(self, value):
        try:
            name = value.progress().name()
            if name:
                spark.table(name).show()
        except: pass
Run Code Online (Sandbox Code Playgroud)

它提供的控制比一点少Subject(一点需要注意,这可能会干扰其他代码打印到stdout,并且只能通过删除监听器来停止.一旦你完成,Subject你可以很容易地dispose subscribed观察,但是否则应该更多或者不太相同.

请注意,任何阻塞操作都足以捕获侦听器的输出,并且不必在同一单元中执行.例如

%pyspark

observer = StreamingObserver()
spark.streams.addListener(observer)
Run Code Online (Sandbox Code Playgroud)

%pyspark

import time

time.sleep(42)
Run Code Online (Sandbox Code Playgroud)

将以类似的方式工作,打印表定义的时间间隔.

为了完整性,您可以实施StreamingQueryManager.removeListener.