use*_*104 5 scala akka-stream spark-streaming akka-http
我正在构建一个REST API,它在Spark集群中启动一些计算,并使用一个分块的结果流进行响应.鉴于Spark流与计算结果,我可以使用
dstream.foreachRDD()
Run Code Online (Sandbox Code Playgroud)
从Spark发送数据.我正在使用akka-http发送分块的HTTP响应:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
Run Code Online (Sandbox Code Playgroud)
为简单起见,我试图首先使纯文本工作,稍后将添加JSON编组.
但是使用Spark DStream作为Akka流源的惯用方法是什么?我认为我应该可以通过套接字来实现它,但由于Spark驱动程序和REST端点位于相同的JVM上,因此开放套接字似乎有点过分.
在问题发生时不确定api的版本.但是现在,使用akka-stream 2.0.3,我相信你可以这样做:
val source = Source
.actorRef[T](/* buffer size */ 100, OverflowStrategy.dropHead)
.mapMaterializedValue[Unit] { actorRef =>
dstream.foreach(actorRef ! _)
}
Run Code Online (Sandbox Code Playgroud)
编辑:这个答案仅适用于旧版本的 Spark 和 akka。PH88的答案是最近版本的正确方法。
您可以使用提供源的中间体akka.actor.Actor(类似于此问题)。下面的解决方案不是“反应式”的,因为底层 Actor 需要维护 RDD 消息的缓冲区,如果下游 http 客户端没有足够快地消耗块,则该缓冲区可能会被删除。但无论实现细节如何,都会出现此问题,因为您无法将 akka 流背压的“节流”连接到 DStream 以减慢数据速度。这是由于 DStream 没有实现org.reactivestreams.Publisher.
基本拓扑结构为:
DStream --> Actor with buffer --> Source
Run Code Online (Sandbox Code Playgroud)
要构建此拓扑,您必须创建一个类似于此处实现的 Actor :
//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props
Run Code Online (Sandbox Code Playgroud)
基于JobManager创建ByteStrings(消息)的流Source。另外,将 转换ByteString为HttpEntity.ChunkStreamPartHttpResponse 所需的内容:
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString
type Message = ByteString
val messageToChunkPart =
Flow[Message].map(HttpEntity.ChunkStreamPart(_))
//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] =
Source(ActorPublisher[Message](actorRef)) via messageToChunkPart
Run Code Online (Sandbox Code Playgroud)
将 Spark DStream 链接到 Actor,以便将每个传入的 RDD 转换为 ByteString 的 Iterable,然后转发到 Actor:
import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD
val dstream : DStream = ???
//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???
def sendMessageToActor(message : Message) = actorRef ! message
//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}
Run Code Online (Sandbox Code Playgroud)
向 HttpResponse 提供源:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
Run Code Online (Sandbox Code Playgroud)
注意:该行和 HttpReponse 之间应该有很少的时间/代码,因为在该行执行dstream foreachRDD后,Actor 的内部缓冲区将立即开始填充来自 DStream 的 ByteString 消息。foreach
| 归档时间: |
|
| 查看次数: |
1629 次 |
| 最近记录: |