Dam*_*ian 0 scala akka akka-stream akka-http apache-kafka-streams
我们正在使用 Kafka,并希望使用交互式查询来访问我们状态存储中的数据。我们有一个现有的服务,它使用 Akka HTTP 来提供 REST API,我们希望将交互式查询集成到流程中。
似乎kafka-streams-query非常适合这个。但是,它通过公开route使用低级 API的属性集成到 Akka HTTP 中,该属性映射到Flow[HttpRequest, HttpResponse, Any]. 我们之前的所有代码都使用 Akka HTTP 的路由 DSL 连接代码。
我希望像下面这样的代码可以工作,但它没有:
implicit val system:ActorSystem = ActorSystem("web")
implicit val materializer:ActorMaterializer = ActorMaterializer()
implicit val ec = system.dispatcher
val firstRoutes:Route = ... //a route object populated
val lastRoutes:Route = ... //other route object populad
val iqServiceFlow:Flow[HttpRequest, HttpResponse, Any] = ...// code that returns the interactive query service
val firstFlow = Route.handlerFlow(firstRoutes)
val lastFlow = Route.handlerFlow(lastRoutes)
// The following code doesn't work though everything I've seen online suggests it should
val handler = firstFlow via iqServiceFlow via lastFlow
Http().bindAndHandle(handler, "0.0.0.0", 8000)
Run Code Online (Sandbox Code Playgroud)
如何在 Akka Streams 中组合流?
编辑:更正了处理程序分配语句。
为了清楚起见,让我们首先明确所有返回类型:
val iqServiceFlow: Flow[HttpRequest, HttpResponse, Any] = ...
val firstFlow: Flow[HttpRequest, HttpResponse, NotUsed] = Route.handlerFlow(firstRoutes)
val lastFlow: Flow[HttpRequest, HttpResponse, NotUsed] = Route.handlerFlow(lastRoutes)
Run Code Online (Sandbox Code Playgroud)
此外,而不是...
val handler = firstRoutes via iqServiceFlow via lastFlow
Run Code Online (Sandbox Code Playgroud)
......你可能的意思是:
val handler = firstFlow via iqServiceFlow via lastFlow
Run Code Online (Sandbox Code Playgroud)
为了将流与 链接在一起via,输入和输出类型必须匹配:即,第一个流的输出类型必须与第二个流的输入类型相同,依此类推。您尝试对处理程序执行的操作如下:
[HttpRequest, HttpResponse] // firstFlow
|
v
[HttpRequest, HttpResponse] // iqServiceFlow
|
v
[HttpRequest, HttpResponse] // lastFlow
Run Code Online (Sandbox Code Playgroud)
您所有流的输出类型是HttpResponse,但它们各自的输入类型都是HttpRequest,因此您不能将它们与via.
要链接您的流程,您需要一个以某种方式将 a 转换HttpResponse为 an的函数HttpRequest:
val respToReq: HttpResponse => HttpRequest = ...
Run Code Online (Sandbox Code Playgroud)
您可以从上述函数创建流:
val convertingFlow: Flow[HttpResponse, HttpRequest] = Flow.fromFunction(respToReq)
Run Code Online (Sandbox Code Playgroud)
现在您可以链接您的流程:
val handler = firstFlow via convertingFlow via iqServiceFlow via convertingFlow via lastFlow
Run Code Online (Sandbox Code Playgroud)
类型对齐如下:
[HttpRequest, HttpResponse] // firstFlow
|
v
[HttpResponse, HttpRequest] // convertingFlow
|
v
[HttpRequest, HttpResponse] // iqServiceFlow
|
v
[HttpResponse, HttpRequest] // convertingFlow
|
v
[HttpRequest, HttpResponse] // lastFlow
Run Code Online (Sandbox Code Playgroud)
以上假设您可以重用相同的转换函数/流程。如果这个假设不成立,您显然可以创建不同的转换函数/流。