我有示例代码生成一个未绑定的源并使用它:
对象Main {
def main(args : Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val source: Source[String] = Source(() => {
Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
})
source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}
Run Code Online (Sandbox Code Playgroud)
}
我想创建实现的类:
trait MySources {
def addToSource(item: String)
def getSource() : Source[String]
}
Run Code Online (Sandbox Code Playgroud)
我需要使用多个线程,例如:
class MyThread(mySources: MySources) extends Thread {
override def run(): Unit = {
for(i <- 1 to 1000000) { // here will …Run Code Online (Sandbox Code Playgroud) 我正在构建一个REST API,它在Spark集群中启动一些计算,并使用一个分块的结果流进行响应.鉴于Spark流与计算结果,我可以使用
dstream.foreachRDD()
Run Code Online (Sandbox Code Playgroud)
从Spark发送数据.我正在使用akka-http发送分块的HTTP响应:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
Run Code Online (Sandbox Code Playgroud)
为简单起见,我试图首先使纯文本工作,稍后将添加JSON编组.
但是使用Spark DStream作为Akka流源的惯用方法是什么?我认为我应该可以通过套接字来实现它,但由于Spark驱动程序和REST端点位于相同的JVM上,因此开放套接字似乎有点过分.