如何在Zeppelin中绘制实时图表?

Agn*_*dra 2 scala angularjs apache-zeppelin

我正在尝试使用zeppelin来绘制实时图表.我每分钟对推文进行情绪分析.我可以静态查询并绘制图表.但我希望这是动态完成的.我是zeppelin的新手,对angularJS知之甚少.这个问题的正确方法应该是什么?

 val final_score=uni_join.map{case((year,month,day,hour,minutes),(tweet_count,sentiment))=>(year, month, day, hour, minutes(sentiment/tweet_count).ceil)}


final_score.saveToCassandra("twitter", "score",writeConf = WriteConf(ttl = TTLOption.constant(1000)))

final_score.foreachRDD(score => {
val rowRDD =score.map{case(year,month,day,hour,minutes,sentiment) =>(year,month,day,hour,minutes,sentiment) }
  val tempDF = sqlContext.createDataFrame(rowRDD)

   z.angularBindGlobal("stream", parsed) //to bind parsed to stream.
   tempDF.registerTempTable("realTimeTable")
})
Run Code Online (Sandbox Code Playgroud)

对上表进行查询,我就能得到图表.但我想每分钟动态更新图表,以便与情绪评分保持同步.先谢谢.[更新] zeppelin笔记本的角度部分如下:

%angular
<div id="graph"  style="height: 100%; width: 100%">
<canvas id="myChart" width="400" height="400"></canvas>
<div id="legendDiv"></div>
</div>
<script>
function initMap() {

var colorList = ["#fde577", "#ff6c40", "#c72a40", "#520833", "#a88399"]


var el = angular.element($('#stream'));

console.log("El is "+el) //returns el as object

angular.element(el).ready(function() {
    console.log('Hello')
    window.locationWatcher =el.$scope.$watch('stream', function(new, old){
console.log('changed');}, true)})
</script>
Run Code Online (Sandbox Code Playgroud)

但是运行此代码会继续返回以下错误.

vendor.js:29 jQuery.Deferred exception: Cannot read property '$watch' of  undefined TypeError: Cannot read property '$watch' of undefined
Run Code Online (Sandbox Code Playgroud)

我使用的火花版本是1.6而Zeppelin是0.6

Roc*_*ang 7

自0.6.3版本以来,spark-highcharts支持Spark Structured Streaming.

对于structuredDataFrame后聚合,在一个Zeppelin段落中使用以下代码.该输出outputmode可以是appendcomplete取决于structureDataFrame是如何聚集.

import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._

val query = highcharts(
  structuredDataFrame.seriesCol("country")
    .series("x" -> "year", "y" -> "stockpile")
    .orderBy(col("year")), z, "append")
Run Code Online (Sandbox Code Playgroud)

并在下一段中提供以下代码.当有新数据进入structureDataFrame时,将更新本段中的图表.

StreamingChart(z)
Run Code Online (Sandbox Code Playgroud)

运行以下代码以停止更新图表.

query.stop()
Run Code Online (Sandbox Code Playgroud)

以下是生成structureDataFrame的示例.

spark.conf.set("spark.sql.streaming.checkpointLocation","/usr/zeppelin/checkpoint")

case class NuclearStockpile(country: String, stockpile: Int, year: Int)

val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640,
  1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126,
  27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662,
  26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605,
  24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586,
  22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950,
  10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104).
    zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2))

val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
  5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322,
  4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478,
  15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049,
  33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000,
  35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000,
  21000, 20000, 19000, 18000, 18000, 17000, 16000).
    zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2))

input.addData(USA.take(30) ++ USSR.take(30))
val structureDataFrame = input.toDF
Run Code Online (Sandbox Code Playgroud)

并且可以模拟以下代码来更新图表.运行以下代码时,将更新图表.

input.addData(USA.drop(30) ++ USSR.drop(30))
Run Code Online (Sandbox Code Playgroud)

注意:使用Zeppelin 0.6.2和Spark 2.0的示例

注意:请检查highcharts许可证以了解商业用途