Adi*_*ber 0 scala cloudera apache-spark spark-streaming
我使用Spark 1.6.0和Cloudera 5.8.3.
我有一个DStream对象,并在其上定义了大量的转换,
val stream = KafkaUtils.createDirectStream[...](...)
val mappedStream = stream.transform { ... }.map { ... }
mappedStream.foreachRDD { ... }
mappedStream.foreachRDD { ... }
mappedStream.map { ... }.foreachRDD { ... }
Run Code Online (Sandbox Code Playgroud)
有没有办法注册foreachRDD保证最后执行的最后一个并且只有在上面的foreachRDDs完成执行时?
换句话说,当Spark UI显示作业已完成时 - 就在我想要执行轻量级函数时.
API中是否有允许我实现的内容?
谢谢
使用流式监听器应该为您解决问题:
(对不起,这是一个java的例子)
ssc.addStreamingListener(new JobListener());
// ...
class JobListener implements StreamingListener {
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo().totalDelay().get().toString() + " ms");
}
/*
snipped other methods
*/
}
Run Code Online (Sandbox Code Playgroud)
https://gist.github.com/akhld/b10dc491aad1a2007183