Agn*_*dra 2 scala angularjs apache-zeppelin
我正在尝试使用zeppelin来绘制实时图表.我每分钟对推文进行情绪分析.我可以静态查询并绘制图表.但我希望这是动态完成的.我是zeppelin的新手,对angularJS知之甚少.这个问题的正确方法应该是什么?
val final_score=uni_join.map{case((year,month,day,hour,minutes),(tweet_count,sentiment))=>(year, month, day, hour, minutes(sentiment/tweet_count).ceil)}
final_score.saveToCassandra("twitter", "score",writeConf = WriteConf(ttl = TTLOption.constant(1000)))
final_score.foreachRDD(score => {
val rowRDD =score.map{case(year,month,day,hour,minutes,sentiment) =>(year,month,day,hour,minutes,sentiment) }
val tempDF = sqlContext.createDataFrame(rowRDD)
z.angularBindGlobal("stream", parsed) //to bind parsed to stream.
tempDF.registerTempTable("realTimeTable")
})
Run Code Online (Sandbox Code Playgroud)
对上表进行查询,我就能得到图表.但我想每分钟动态更新图表,以便与情绪评分保持同步.先谢谢.[更新] zeppelin笔记本的角度部分如下:
%angular
<div id="graph" style="height: 100%; width: 100%">
<canvas id="myChart" width="400" height="400"></canvas>
<div id="legendDiv"></div>
</div>
<script>
function initMap() {
var colorList = ["#fde577", "#ff6c40", "#c72a40", "#520833", "#a88399"]
var el = angular.element($('#stream'));
console.log("El is "+el) //returns el as object
angular.element(el).ready(function() {
console.log('Hello')
window.locationWatcher =el.$scope.$watch('stream', function(new, old){
console.log('changed');}, true)})
</script>
Run Code Online (Sandbox Code Playgroud)
但是运行此代码会继续返回以下错误.
vendor.js:29 jQuery.Deferred exception: Cannot read property '$watch' of undefined TypeError: Cannot read property '$watch' of undefined
Run Code Online (Sandbox Code Playgroud)
我使用的火花版本是1.6而Zeppelin是0.6
自0.6.3版本以来,spark-highcharts支持Spark Structured Streaming.
对于structuredDataFrame后聚合,在一个Zeppelin段落中使用以下代码.该输出outputmode可以是append或complete取决于structureDataFrame是如何聚集.
import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._
val query = highcharts(
structuredDataFrame.seriesCol("country")
.series("x" -> "year", "y" -> "stockpile")
.orderBy(col("year")), z, "append")
Run Code Online (Sandbox Code Playgroud)
并在下一段中提供以下代码.当有新数据进入structureDataFrame时,将更新本段中的图表.
StreamingChart(z)
Run Code Online (Sandbox Code Playgroud)
运行以下代码以停止更新图表.
query.stop()
Run Code Online (Sandbox Code Playgroud)
以下是生成structureDataFrame的示例.
spark.conf.set("spark.sql.streaming.checkpointLocation","/usr/zeppelin/checkpoint")
case class NuclearStockpile(country: String, stockpile: Int, year: Int)
val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640,
1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126,
27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662,
26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605,
24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586,
22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950,
10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104).
zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2))
val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322,
4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478,
15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049,
33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000,
35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000,
21000, 20000, 19000, 18000, 18000, 17000, 16000).
zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2))
input.addData(USA.take(30) ++ USSR.take(30))
val structureDataFrame = input.toDF
Run Code Online (Sandbox Code Playgroud)
并且可以模拟以下代码来更新图表.运行以下代码时,将更新图表.
input.addData(USA.drop(30) ++ USSR.drop(30))
Run Code Online (Sandbox Code Playgroud)
注意:使用Zeppelin 0.6.2和Spark 2.0的示例
注意:请检查highcharts许可证以了解商业用途
| 归档时间: |
|
| 查看次数: |
2617 次 |
| 最近记录: |