我正在(来自zeppelin 0.7的spark 2.1.0)中运行结构化流处理来自kafka的数据,并且试图通过spark.sql可视化流处理结果
如下 :
%spark2
val spark = SparkSession
.builder()
.appName("Spark structured streaming Kafka example")
.master("yarn")
.getOrCreate()
val inputstream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "n11.hdp.com:6667,n12.hdp.com:6667,n13.hdp.com:6667 ,n10.hdp.com:6667, n9.hdp.com:6667")
.option("subscribe", "st")
.load()
val stream = inputstream.selectExpr("CAST( value AS STRING)").as[(String)].select(
expr("(split(value, ','))[0]").cast("string").as("pre_post_paid"),
expr("(split(value, ','))[1]").cast("double").as("DataUpload"),
expr("(split(value, ','))[2]").cast("double").as("DataDowndownload"))
.filter("DataUpload is not null and DataDowndownload is not null")
.groupBy("pre_post_paid").agg(sum("DataUpload") + sum("DataDowndownload") as "size")
val query = stream.writeStream
.format("memory")
.outputMode("complete")
.queryName("test")
.start()
Run Code Online (Sandbox Code Playgroud)
它运行后,我在“测试”上查询如下:
%sql
select *
from test
Run Code Online (Sandbox Code Playgroud)
它仅在手动运行时更新,我的问题是如何处理新数据(流化可视化)时进行更新,如以下示例所示:
visualization spark-streaming apache-spark-sql apache-zeppelin