cod*_*r25 1 asynchronous akka akka-stream
mapAsync我正在比较和 之间的区别async
object Demo3 extends App{
implicit val system = ActorSystem("MyDemo")
implicit val materializer = ActorMaterializer()
private val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
private def test(a:Int) ={
println(s"Flow A : ${Thread.currentThread().getName()}" )
Future(println(a+1))(ec)
}
Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()
}
Run Code Online (Sandbox Code Playgroud)
输出
Flow A : MyDemo-akka.actor.default-dispatcher-2
2
Flow A : MyDemo-akka.actor.default-dispatcher-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-2
Flow A : MyDemo-akka.actor.default-dispatcher-2
4
Flow A : MyDemo-akka.actor.default-dispatcher-2
5
Flow A : MyDemo-akka.actor.default-dispatcher-2
6
Flow A : MyDemo-akka.actor.default-dispatcher-2
7
Flow A : MyDemo-akka.actor.default-dispatcher-2
8
Flow A : MyDemo-akka.actor.default-dispatcher-2
9
Flow A : MyDemo-akka.actor.default-dispatcher-2
10
11
Run Code Online (Sandbox Code Playgroud)
为什么尽管并行度为 10,它仍显示一个线程名称。它不是异步运行吗?当我用 line 替换它时Source(1 to 100).map(test).async.to(Sink.ignore).run(),mapAsyncandasync每次都使用单线程吗?
输出
Flow A : MyDemo-akka.actor.default-dispatcher-4
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
9
Flow A : MyDemo-akka.actor.default-dispatcher-4
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
11
Run Code Online (Sandbox Code Playgroud)
在 中test,println打印线程 ID 是在 future 之外执行的,因此它是同步执行的。ExecutionContextfuture 中的代码将在(在本例中是 actor 系统的调度程序)的线程上执行。值得注意的是,发生了一些并行执行:线程 print fora = 4发生在a + 1print for之前a = 3。
如果将线程移到println未来,它将println异步执行:
Future {
println(s"Flow A : ${Thread.currentThread().getName()}")
println(a+1)
}(ec)
Run Code Online (Sandbox Code Playgroud)
请注意,在您的测试代码中,您不太可能看到太多并行执行:生成未来所涉及的工作量接近未来完成的工作量(即使将来有第二次打印),因此,生成的 future 通常会在下一个 future 生成之前完成。
mapAsync最好将其视为同步调用返回 future 的代码(future 在返回时可能已完成,也可能未完成)并将该 future 存储在大小为 的缓冲区中parallelism。当该缓冲区中的 future 成功完成时,它完成的值将被发出,并且缓冲区中的槽被释放,从而允许mapAsync请求另一个元素(我在技术上进行描述mapAsyncUnordered,因为它更简单:mapAsync直到之前创建的每个 future 才会发出已完成的元素已成功完成并发出;我实际上不知道稍后完成的元素是否会在缓冲区中打开一个插槽)。这是否真正导致并行性取决于 future 的细节以及它是如何完成的(例如,如果 future 每次都是同一个参与者的请求,则有效并行性不可能超过 1)。
async在我看来,可能应该被称为stageBoundary或类似的东西,正是因为它经常导致人们这样认为mapAsync,并且map(...).async有很多共同点。async是一个信号,表明前一个和此Materializer之间的阶段不应与 后的阶段融合。通常,融合的阶段由单个演员执行。这样做的优点是消除了从一个阶段到另一个阶段传输元素的开销,但代价是通常将融合阶段中执行的元素数量限制为 1。融合阶段之间有一个隐式缓冲区:下游阶段将发出信号基于其缓冲区中的空槽的需求。这两个阶段将并行处理,即之前的(可能融合的)阶段可以在处理元素的同时,之后的(可能融合的)阶段处理元素。这基本上是流水线执行。asyncasyncasyncActorMaterializerasyncasync
因此,Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()整个流被具体化为单个参与者,其中(实际上:这是符合如何具体化流的要求的描述)在单个参与者中(因此,除了计划到 的任务之外,所有这些都ExecutionContext执行同步地、按顺序):
Sink.ignore有效地发出无限的需求信号mapAsyncmapAsync有 10 个空槽,因此从源请求 10 个元素source发出 1mapAsync打印当前线程,创建一个Promise[Unit],创建一个看起来像这样的闭包对象new ClosureClassName { val a: Int = 1; def run(): Unit = println(a+1) }
Run Code Online (Sandbox Code Playgroud)
并安排一个任务,在ec该任务上运行run闭包方法并完成承诺的未来。会ec根据 的逻辑将任务调度到一个线程上异步执行ec;同时我们的演员将未来保存在其缓冲区中(称之为futureBuffer(0))
futureBuffer(0),它已经完成(其中(), 的单例值Unit),mapAsync发出()并Sink.ignore清除futureBuffer(0)Sink.ignore,好吧,忽略()它收到的source现在发出 2mapAsync执行如上,仅a = 2在闭包中futureBuffer(0)(现在的未来a = 2)尚未完成,所以source现在发出 3mapAsync如上所述执行,a = 3在闭包中,将未来保存为futureBuffer(1)futureBuffer(0)都已futureBuffer(1)完成,因此futureBuffer(0)的值被发送到Sink.ignore并被futureBuffer(0)清除Sink.ignore忽略该值futureBuffer(1)的值被发送Sink.ignore并被futureBuffer(1)清除Sink.ignore忽略该值因此,存在一点点并行性mapAsync:实现的并行性本质上是未完成的 future 的数量。
因为Source(1 to 100).map(test).async.to(Sink.ignore).run()这将具体化为类似的东西
Actor A (Source.map)
^
| Future[Unit] sent down, demand signal sent up
v
Actor B (Sink.ignore)
Run Code Online (Sandbox Code Playgroud)
假设物化器设置的每个 actor 有 2 个元素的接收缓冲区。
Sink.ignore有效地发出无限的需求信号Actor BB其缓冲区中有 2 个空闲槽,因此它向Actor A需要的 3 个元素发送一条消息A将此需求传递给map,它要求源中的 1 个元素map如上所述打印当前线程等。它不会将结果(可能已完成或未完成)保存在缓冲区中(它没有缓冲区),而是将未来发送给AA将未来传递给B从现在开始,A和B正在并行处理(至少在这种情况下的某些时候,因为B只是将元素发送到位桶)
B将收到的 future 传递给Sink.ignore,忽略 future (甚至不关心 future 是否已完成,甚至不关心 future 是否失败)等等...一旦 B 收到了三个元素,它将发出另外两个元素的需求信号(假设,很可能,尚未Sink完成忽略未来并且 2 个元素的缓冲区为空)。
在整个过程中值得注意的是,一个 actor 可能会在其运行的线程上从一条消息更改为另一个消息(是否这样做取决于 的ActorSystem调度程序),但 actor 保持着一种单线程错觉:它只在某个时刻使用一个线程。一次。
| 归档时间: |
|
| 查看次数: |
428 次 |
| 最近记录: |