相关疑难解决方法(0)

如何动态地向Source添加元素?

我有示例代码生成一个未绑定的源并使用它:

对象Main {

 def main(args : Array[String]): Unit = {

  implicit val system = ActorSystem("Sys")
  import system.dispatcher

  implicit val materializer = ActorFlowMaterializer()

  val source: Source[String] = Source(() => {
     Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
    })

  source.runForeach((item:String) => { println(item) })
  .onComplete{ _ => system.shutdown() }
 }
Run Code Online (Sandbox Code Playgroud)

}

我想创建实现的类:

trait MySources {
    def addToSource(item: String)
    def getSource() : Source[String]
}
Run Code Online (Sandbox Code Playgroud)

我需要使用多个线程,例如:

class MyThread(mySources: MySources) extends Thread {
  override def run(): Unit = {
    for(i <- 1 to 1000000) { // here will …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream

23
推荐指数
2
解决办法
6046
查看次数

使用Spark DStream作为Akka流的Source的惯用方法

我正在构建一个REST API,它在Spark集群中启动一些计算,并使用一个分块的结果流进行响应.鉴于Spark流与计算结果,我可以使用

dstream.foreachRDD()
Run Code Online (Sandbox Code Playgroud)

从Spark发送数据.我正在使用akka-http发送分块的HTTP响应:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
Run Code Online (Sandbox Code Playgroud)

为简单起见,我试图首先使纯文本工作,稍后将添加JSON编组.

但是使用Spark DStream作为Akka流源的惯用方法是什么?我认为我应该可以通过套接字来实现它,但由于Spark驱动程序和REST端点位于相同的JVM上,因此开放套接字似乎有点过分.

scala akka-stream spark-streaming akka-http

5
推荐指数
2
解决办法
1629
查看次数

标签 统计

akka-stream ×2

scala ×2

akka ×1

akka-http ×1

spark-streaming ×1