使用Spark DStream作为Akka流的Source的惯用方法

use*_*104 5 scala akka-stream spark-streaming akka-http

我正在构建一个REST API,它在Spark集群中启动一些计算,并使用一个分块的结果流进行响应.鉴于Spark流与计算结果,我可以使用

dstream.foreachRDD()
Run Code Online (Sandbox Code Playgroud)

从Spark发送数据.我正在使用akka-http发送分块的HTTP响应:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
Run Code Online (Sandbox Code Playgroud)

为简单起见,我试图首先使纯文本工作,稍后将添加JSON编组.

但是使用Spark DStream作为Akka流源的惯用方法是什么?我认为我应该可以通过套接字来实现它,但由于Spark驱动程序和REST端点位于相同的JVM上,因此开放套接字似乎有点过分.

PH8*_*H88 8

在问题发生时不确定api的版本.但是现在,使用akka-stream 2.0.3,我相信你可以这样做:

val source = Source
  .actorRef[T](/* buffer size */ 100, OverflowStrategy.dropHead)
  .mapMaterializedValue[Unit] { actorRef =>
    dstream.foreach(actorRef ! _)
  }
Run Code Online (Sandbox Code Playgroud)


Ram*_*gil 3

编辑:这个答案仅适用于旧版本的 Spark 和 akka。PH88的答案是最近版本的正确方法。

您可以使用提供源的中间体akka.actor.Actor(类似于此问题)。下面的解决方案不是“反应式”的,因为底层 Actor 需要维护 RDD 消息的缓冲区,如果下游 http 客户端没有足够快地消耗块,则该缓冲区可能会被删除。但无论实现细节如何,都会出现此问题,因为您无法将 akka 流背压的“节流”连接到 DStream 以减慢数据速度。这是由于 DStream 没有实现org.reactivestreams.Publisher.

基本拓扑结构为:

DStream --> Actor with buffer --> Source
Run Code Online (Sandbox Code Playgroud)

要构建此拓扑,您必须创建一个类似于此处实现的 Actor :

//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props 
Run Code Online (Sandbox Code Playgroud)

基于JobManager创建ByteStrings(消息)的流Source。另外,将 转换ByteStringHttpEntity.ChunkStreamPartHttpResponse 所需的内容:

import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString

type Message = ByteString

val messageToChunkPart = 
  Flow[Message].map(HttpEntity.ChunkStreamPart(_))

//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] = 
  Source(ActorPublisher[Message](actorRef)) via messageToChunkPart
Run Code Online (Sandbox Code Playgroud)

将 Spark DStream 链接到 Actor,以便将每个传入的 RDD 转换为 ByteString 的 Iterable,然后转发到 Actor:

import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD

val dstream : DStream = ???

//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???

def sendMessageToActor(message : Message) = actorRef ! message

//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}
Run Code Online (Sandbox Code Playgroud)

向 HttpResponse 提供源:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
Run Code Online (Sandbox Code Playgroud)

注意:该行和 HttpReponse 之间应该有很少的时间/代码,因为在该行执行dstream foreachRDD后,Actor 的内部缓冲区将立即开始填充来自 DStream 的 ByteString 消息。foreach