在 Akka Streams 中连接流

Dam*_*ian 0 scala akka akka-stream akka-http apache-kafka-streams

我们正在使用 Kafka,并希望使用交互式查询来访问我们状态存储中的数据。我们有一个现有的服务,它使用 Akka HTTP 来提供 REST API,我们希望将交互式查询集成到流程中。

似乎kafka-streams-query非常适合这个。但是,它通过公开route使用低级 API的属性集成到 Akka HTTP 中,该属性映射到Flow[HttpRequest, HttpResponse, Any]. 我们之前的所有代码都使用 Akka HTTP 的路由 DSL 连接代码。

我希望像下面这样的代码可以工作,但它没有:

implicit val system:ActorSystem = ActorSystem("web")
implicit val materializer:ActorMaterializer = ActorMaterializer()
implicit val ec = system.dispatcher

val firstRoutes:Route = ... //a route object populated
val lastRoutes:Route = ... //other route object populad

val iqServiceFlow:Flow[HttpRequest, HttpResponse, Any] = ...// code that returns the interactive query service

val firstFlow = Route.handlerFlow(firstRoutes)
val lastFlow = Route.handlerFlow(lastRoutes)

// The following code doesn't work though everything I've seen online suggests it should
val handler = firstFlow via iqServiceFlow via lastFlow

Http().bindAndHandle(handler, "0.0.0.0", 8000)
Run Code Online (Sandbox Code Playgroud)

如何在 Akka Streams 中组合流?

编辑:更正了处理程序分配语句。

Jef*_*ung 5

为了清楚起见,让我们首先明确所有返回类型:

val iqServiceFlow: Flow[HttpRequest, HttpResponse, Any] = ...
val firstFlow: Flow[HttpRequest, HttpResponse, NotUsed] = Route.handlerFlow(firstRoutes)
val lastFlow: Flow[HttpRequest, HttpResponse, NotUsed]  = Route.handlerFlow(lastRoutes)
Run Code Online (Sandbox Code Playgroud)

此外,而不是...

val handler = firstRoutes via iqServiceFlow via lastFlow
Run Code Online (Sandbox Code Playgroud)

......你可能的意思是:

val handler = firstFlow via iqServiceFlow via lastFlow
Run Code Online (Sandbox Code Playgroud)

为了将流与 链接在一起via,输入和输出类型必须匹配:即,第一个流的输出类型必须与第二个流的输入类型相同,依此类推。您尝试对处理程序执行的操作如下:

[HttpRequest, HttpResponse] // firstFlow
                   |
                   v
             [HttpRequest, HttpResponse] // iqServiceFlow
                                |
                                v
                          [HttpRequest, HttpResponse] // lastFlow
Run Code Online (Sandbox Code Playgroud)

您所有流的输出类型是HttpResponse,但它们各自的输入类型都是HttpRequest,因此您不能将它们与via.

要链接您的流程,您需要一个以某种方式将 a 转换HttpResponse为 an的函数HttpRequest

val respToReq: HttpResponse => HttpRequest = ...
Run Code Online (Sandbox Code Playgroud)

您可以从上述函数创建流:

val convertingFlow: Flow[HttpResponse, HttpRequest] = Flow.fromFunction(respToReq)
Run Code Online (Sandbox Code Playgroud)

现在您可以链接您的流程:

val handler = firstFlow via convertingFlow via iqServiceFlow via convertingFlow via lastFlow
Run Code Online (Sandbox Code Playgroud)

类型对齐如下:

[HttpRequest, HttpResponse] // firstFlow
                   |
                   v
             [HttpResponse, HttpRequest] // convertingFlow
                                |
                                v
                           [HttpRequest, HttpResponse] // iqServiceFlow
                                              |
                                              v
                                        [HttpResponse, HttpRequest] // convertingFlow
                                                            |
                                                            v                              
                                                      [HttpRequest, HttpResponse] // lastFlow
Run Code Online (Sandbox Code Playgroud)

以上假设您可以重用相同的转换函数/流程。如果这个假设不成立,您显然可以创建不同的转换函数/流。