为什么使用 monix 或 akka-streams 将 A 类映射到 B 类如此缓慢?

Bou*_*ONE 0 scala akka-stream monix

我已经使用 monix 和 akka-streams 对 List[ClassA] 到 List[ClassB] 的映射进行了基准测试,但我不明白为什么它这么慢。

我尝试了不同的映射方式,这是 JMH 的结果:

[info] Benchmark                                    Mode  Cnt    Score    Error  Units
[info] MappingBenchmark.akkaLoadBalanceMap            ss   20  742,626 â–’  4,853  ms/op
[info] MappingBenchmark.akkaMapAsyncFold              ss   20  480,460 â–’  8,493  ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync         ss   20  331,398 â–’ 10,490  ms/op
[info] MappingBenchmark.akkaMapFold                   ss   20  713,500 â–’  7,394  ms/op
[info] MappingBenchmark.akkaMapFoldAsync              ss   20  313,275 â–’  8,716  ms/op
[info] MappingBenchmark.map                           ss   20    0,567 â–’  0,175  ms/op
[info] MappingBenchmark.monixBatchedObservables       ss   20  259,736 â–’  5,939  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft         ss   20  456,310 â–’  5,225  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync    ss   20  795,345 â–’  5,443  ms/op
[info] MappingBenchmark.monixMapFoldLeft              ss   20  247,172 â–’  5,342  ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync         ss   20  478,840 â–’ 25,249  ms/op
[info] MappingBenchmark.monixTaskGather               ss   20    6,707 â–’  2,176  ms/op
[info] MappingBenchmark.parMap                        ss   20    1,257 â–’  0,831  ms/op
Run Code Online (Sandbox Code Playgroud)

这是代码:

package benches

import java.util.concurrent.TimeUnit

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
  import monix.eval._
  import monix.reactive._
  import monix.execution.Scheduler.Implicits.global

  def list: List[ClassA] = (1 to 10000).map(ClassA).toList
  //    val l = (1 to 135368).map(Offre).toList

  // ##### SCALA ##### //

  @Benchmark
  def map: List[ClassB] = list.map(o => ClassB(o, o))

  @Benchmark
  def parMap: List[ClassB] = list.par.map(o => ClassB(o, o)).toList

  // ##### MONIX ##### //

  @Benchmark
  def monixTaskGather: List[ClassB] = {
    val task: Task[List[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixBatchedObservables: List[ClassB] = {
    val task: Task[List[ClassB]] =
      Observable.fromIterable(list)
        .bufferIntrospective(256)
        .flatMap{items =>
          val tasks = items.map(o => Task(ClassB(o,o)))
          val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
          val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
          Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
        }.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)
  }

  // ##### AKKA-STREAM ##### //

  @Benchmark
  def akkaMapFold: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapFoldAsync: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFold: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFoldAsync: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaLoadBalanceMap: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = {
      val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _)
      RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder =>
        sink =>
          import GraphDSL.Implicits._
          val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
          val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
          val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
          Source(list) ~> balance
          (1 to 4).foreach{ i =>
            balance ~> mapClassB.async ~> merge
          }
          merge ~> sink
          ClosedShape
      })
    }
    runAkkaGraph(graph)
  }

  private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = {
    implicit val actorSystem = ActorSystem("app")
    implicit val actorMaterializer = ActorMaterializer()
    val eventualBs = g.run()
    val res = Await.result(eventualBs, Duration.Inf)
    actorSystem.terminate()
    res
  }
}

case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)
Run Code Online (Sandbox Code Playgroud)

当初始集合更大时,基准结果变得更糟。

我想知道我的错误是什么。

感谢您分享您的知识!

此致

Ale*_*lcu 5

只是关于异步处理/并行性的说明......一般来说,当并行处理内容时,您最终会因同步结果而产生相当多的 CPU 绑定开销。

事实上,开销可能非常大,以至于它可以抵消您从多个 CPU 内核并行工作中获得的时间收益。

您还应该熟悉阿姆达尔定律。看看这些数字:如果并行部分为 75%,您只需 4 个处理器即可达到最大可能的加速。并行部分为 50%,您只需使用 2 个处理器即可达到最大加速。

这只是理论上的限制,因为您还拥有处理器之间的共享内存同步,这可能会变得非常混乱;基本上处理器针对顺序执行进行了优化。引入并发问题,您需要使用内存屏障强制排序,这会使许多 CPU 优化无效。因此,您可以达到负加速,正如在您的测试中实际看到的那样。

所以你正在测试异步/并行映射,但测试基本上什么都不做,不妨用身份函数进行测试,它几乎是一样的。换句话说,你正在做的测试及其结果在实践中几乎没有用

顺便说一句,这也是我从不喜欢“平行系列”这个想法的原因。这个概念是有缺陷的,因为您只能将并行集合用于纯粹受 CPU 限制的内容(即没有 I/O,没有实际的异步内容),也就是说,它可以用于进行一些计算,除了:

  1. 出于许多目的,并行集合的使用比使用单个 CPU 的普通运算符要慢,并且
  2. 如果您确实有 CPU 密集型工作,并且需要最大限度地使用硬件资源,那么当前版本中的“并行集合”实际上是错误的抽象,因为如今的“硬件”包括 GPU

换句话说,并行集合没有有效地使用硬件资源,因为它们完全忽略了 GPU 支持,并且完全不适合混合 CPU - I/O 任务,因为它们缺乏异步支持。

我觉得有必要提到这一点,因为人们常常认为在他们的代码上擦一些“并行”小精灵会使它运行得更快,但很多时候它不会。

当您有 I/O 密集型任务(当然与 CPU 密集型任务混合)时,并行性非常有效,在这种情况下,CPU 开销要小得多,因为处理时间将由 I/O 主导。

PS:Scala 集合上的普通映射应该更快,因为它是严格的,并且(取决于集合类型)它使用数组支持的缓冲区,因此不会破坏 CPU 缓存。Monix 的.map开销与 Scala 的开销相同Iterable.map,或者换句话说接近于零的开销,但它的应用程序是惰性的,并引入了一些装箱开销,我们无法摆脱,因为 JVM 没有专门研究泛型。

不过在实践中它非常快;-)