小编sno*_*oze的帖子

自定义接收器在Spark Streaming中停止工作

我正在尝试使用自定义接收器编写Spark Streaming应用程序.我应该通过提供具有预定义间隔的随机值来模拟实时输入数据.(简化)接收器如下所示,使用下面的Spark Streaming应用程序代码:

class SparkStreamingReceiver extends Actor with ActorHelper {

  private val random = new Random()

  override def preStart = {
    context.system.scheduler.schedule(500 milliseconds, 1000 milliseconds)({
        self ! ("string", random.nextGaussian())
    })
  }

  override def receive = {
    case data: (String, Double) => {
      store[(String, Double)](data)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)
val conf: SparkConf = new SparkConf()
conf.setAppName("Spark Streaming App")
    .setMaster("local")

val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))

val randomValues: ReceiverInputDStream[(String, Double)] =
    ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()), "Receiver")

randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues")
Run Code Online (Sandbox Code Playgroud)

运行此代码,我看到接收器正在工作(存储项目,收到单个日志条目).但是,saveAsTextFiles永远不会输出值. …

scala apache-spark spark-streaming

3
推荐指数
1
解决办法
1344
查看次数

标签 统计

apache-spark ×1

scala ×1

spark-streaming ×1