自定义接收器在Spark Streaming中停止工作

sno*_*oze 3 scala apache-spark spark-streaming

我正在尝试使用自定义接收器编写Spark Streaming应用程序.我应该通过提供具有预定义间隔的随机值来模拟实时输入数据.(简化)接收器如下所示,使用下面的Spark Streaming应用程序代码:

class SparkStreamingReceiver extends Actor with ActorHelper {

  private val random = new Random()

  override def preStart = {
    context.system.scheduler.schedule(500 milliseconds, 1000 milliseconds)({
        self ! ("string", random.nextGaussian())
    })
  }

  override def receive = {
    case data: (String, Double) => {
      store[(String, Double)](data)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)
val conf: SparkConf = new SparkConf()
conf.setAppName("Spark Streaming App")
    .setMaster("local")

val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))

val randomValues: ReceiverInputDStream[(String, Double)] =
    ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()), "Receiver")

randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues")
Run Code Online (Sandbox Code Playgroud)

运行此代码,我看到接收器正在工作(存储项目,收到单个日志条目).但是,saveAsTextFiles永远不会输出值.

我可以通过将master更改为使用两个线程(local[2])运行来解决此问题,但如果我注册了我的接收器的另一个实例(我打算这样做),它会重新出现.更具体地说,我需要至少有一个线程超过我注册的自定义接收器的数量以获得任何输出.

在我看来好像工作线程被接收器停止了.

任何人都可以解释这种效果,并可能解释如何修复我的代码?

Tat*_*Das 7

每个接收器使用一个计算槽.因此2个接收器将需要2个计算插槽.如果所有计算槽都由接收器占用,那么就没有剩余的槽来处理数据.这就是为什么带有1个接收器的"本地"模式和带有2个接收器的"本地[2]"停止处理的原因.