我正在尝试使用自定义接收器编写Spark Streaming应用程序.我应该通过提供具有预定义间隔的随机值来模拟实时输入数据.(简化)接收器如下所示,使用下面的Spark Streaming应用程序代码:
class SparkStreamingReceiver extends Actor with ActorHelper {
private val random = new Random()
override def preStart = {
context.system.scheduler.schedule(500 milliseconds, 1000 milliseconds)({
self ! ("string", random.nextGaussian())
})
}
override def receive = {
case data: (String, Double) => {
store[(String, Double)](data)
}
}
}
Run Code Online (Sandbox Code Playgroud)
val conf: SparkConf = new SparkConf()
conf.setAppName("Spark Streaming App")
.setMaster("local")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
val randomValues: ReceiverInputDStream[(String, Double)] =
ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()), "Receiver")
randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues")
Run Code Online (Sandbox Code Playgroud)
运行此代码,我看到接收器正在工作(存储项目,收到单个日志条目).但是,saveAsTextFiles永远不会输出值. …