什么是 Akka Streams 中的 Flow#join

ryo*_*ryo 4 scala akka akka-stream

我是 Scala 中 Akka Streams 的学习者。当我在阅读时IncomingConnection,我发现Flow#join. 然后,我在评论中找到了以下图片Flow#join

+------+        +-------+
|      | ~Out~> |       |
| this |        | other |
|      | <~In~  |       |
+------+        +-------+
Run Code Online (Sandbox Code Playgroud)

但是,我想知道它的结构是什么。我认为“加入”会形成一个循环。

所以我希望你解释什么结构“join”,并告诉我简单的示例代码使用 Flow#join

Lod*_*rds 8

文档状态:

通过交叉连接输入和输出,将这个 Flow 加入另一个 Flow ,创建一个RunnableGraph

这是一个很好的例子akka.http.scaladsl,可以帮助解释为什么这很有用:

  /**
   * Represents one accepted incoming HTTP connection.
   */
  final case class IncomingConnection(
    localAddress: InetSocketAddress,
    remoteAddress: InetSocketAddress,
    flow: Flow[HttpResponse, HttpRequest, NotUsed]) {

    /**
     * Handles the connection with the given flow, which is materialized exactly once
     * and the respective materialization result returned.
     */
    def handleWith[Mat](handler: Flow[HttpRequest, HttpResponse, Mat])(implicit fm: Materializer): Mat =
      flow.joinMat(handler)(Keep.right).run()
Run Code Online (Sandbox Code Playgroud)

正如您可能知道handlerAkka http 流的 a 总是从HttpRequest到流HttpResponse,但正如您所看到的,IncomingConnection.flow流从HttpResponse到 a HttpRequest。换句话说,用户有责任根据请求创建响应,而 Akka Http 有责任发送该响应并生成另一个请求。当它涉及另一个 Flow 时,这确实会形成一个闭环,因此该join方法创建了一个RunnableGraph.

要了解如何处理连接,您应该了解更多关于BidiFlow. a 的结果BidiFlow#join是另一个流,因为 BidiFlow 有两个输入和两个输出。这是一个带有示例的出色解释链接