创建用于并行处理集合元素的 Akka 流

Vag*_*lov 3 akka akka-stream

我正在尝试为包含并行处理流的 Akka 流定义一个图形(我正在使用 Akka.NET 但这应该无关紧要)。想象一个订单的数据源,每个订单由一个订单 ID 和一个产品列表(订单项)组成。工作流程如下:

  1. 接收和订购
  2. 将订单广播到两个流,流 A 将处理订单项,通道 B 将处理订单 ID(一些簿记工作)
  3. 流程 A:将订单项集合拆分为单独的元素,每个元素单独处理
  4. 流程 A:对于由上一步拆分产生的每个订单项目,调用一些外部服务来查找额外信息(价格、可用性等)
  5. 流程 B:为给定的订单 ID 做一些额外的记账
  6. 合并流 A 和 B
  7. 将上一步合并的数据发送到接收器,从而生成丰富的订单信息

Steps 1 (Source.From), 2 (Broadcast), 4-5 (Map), 6 (Merge), 7 (Sink) 看起来没问题。但是如何在 Akka 或响应式流术语中实现集合拆分?这不是广播或展平,需要将 N 个元素的集合拆分为 N 个独立的子流,然后再合并回来。这是如何实现的?

exp*_*ert 5

我建议在一个流程中完成。我知道两个流程看起来更酷,但相信我,就设计的简单性而言,这是不值得的(我试过)。你可以这样写

import akka.stream.scaladsl.{Flow, Sink, Source, SubFlow}

import scala.collection.immutable
import scala.concurrent.Future

case class Item()

case class Order(items: List[Item])

val flow = Flow[Order]
  .mapAsync(4) { order =>
    Future {
      // Enrich your order here
      order
    }
  }
  .mapConcat { order =>
    order.items.map(order -> _)
  }
  .mapAsync(4) { case (order, item) =>
    Future {
      // Enrich your item here
      order -> item
    }
  }
  .groupBy(2, tuple => tuple._1)
  .fold[Map[Order, List[Item]]](immutable.Map.empty) { case (map, (order, item)) => map.updated(order, map.getOrElse(order, Nil) :+ item) }
  .mapConcat { _.map { case (order, newItems) => order.copy(items = newItems)} }
Run Code Online (Sandbox Code Playgroud)

但即使这种方法也很糟糕。无论是上面的代码还是您的设计,都有很多事情可能会出错。如果订单项目之一的扩充失败,您会怎么做?如果订单对象的丰富失败怎么办?你的流应该怎么办?

如果我是你,我会拥有Flow[Order]并处理它的孩子,mapAsync所以至少它保证我没有部分处理的订单。