我正在尝试为包含并行处理流的 Akka 流定义一个图形(我正在使用 Akka.NET 但这应该无关紧要)。想象一个订单的数据源,每个订单由一个订单 ID 和一个产品列表(订单项)组成。工作流程如下:
Steps 1 (Source.From), 2 (Broadcast), 4-5 (Map), 6 (Merge), 7 (Sink) 看起来没问题。但是如何在 Akka 或响应式流术语中实现集合拆分?这不是广播或展平,需要将 N 个元素的集合拆分为 N 个独立的子流,然后再合并回来。这是如何实现的?
我建议在一个流程中完成。我知道两个流程看起来更酷,但相信我,就设计的简单性而言,这是不值得的(我试过)。你可以这样写
import akka.stream.scaladsl.{Flow, Sink, Source, SubFlow}
import scala.collection.immutable
import scala.concurrent.Future
case class Item()
case class Order(items: List[Item])
val flow = Flow[Order]
.mapAsync(4) { order =>
Future {
// Enrich your order here
order
}
}
.mapConcat { order =>
order.items.map(order -> _)
}
.mapAsync(4) { case (order, item) =>
Future {
// Enrich your item here
order -> item
}
}
.groupBy(2, tuple => tuple._1)
.fold[Map[Order, List[Item]]](immutable.Map.empty) { case (map, (order, item)) => map.updated(order, map.getOrElse(order, Nil) :+ item) }
.mapConcat { _.map { case (order, newItems) => order.copy(items = newItems)} }
Run Code Online (Sandbox Code Playgroud)
但即使这种方法也很糟糕。无论是上面的代码还是您的设计,都有很多事情可能会出错。如果订单项目之一的扩充失败,您会怎么做?如果订单对象的丰富失败怎么办?你的流应该怎么办?
如果我是你,我会拥有Flow[Order]并处理它的孩子,mapAsync所以至少它保证我没有部分处理的订单。
| 归档时间: |
|
| 查看次数: |
1338 次 |
| 最近记录: |