如何改善reactive-kafka(Scala plus Akka Streams)的缓慢性能?

Gre*_*reg 9 scala akka apache-kafka

我正在将我的项目从RabbitMQ转移到Kafka,并试图了解reactive-kafka的速度有多快.

我现在能够向Rabbit写入大约12K /秒的简单消息/秒,并且在读取时通过"hello world"流以大约4K /秒的速度从队列中轻松拉动.

我用反应流移动到卡夫卡,我可以写1M或秒 - 巨大的胜利!但是在相同的环境中,我只能使用此处示例中的方法在读取流中以大约2K /秒的速度流动:DummyConsumer.scala

有没有人知道如何将读取回到与Rabbit方法相当的水平?

有趣的是:我只是"直接"尝试(通过原始Java驱动程序访问Kafka与反应卡夫卡),并获得大约22K读取,所以这非常好.它关于我如何使用被动反应卡夫卡正在减慢事情的速度.

好的......我正在寻找这个东西.接下来我尝试了原始的Akka流"hello world":

now = System.currentTimeMillis()
count = 0
val in2 = Source(1 to num)
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._
  val show = Flow[Int].map{ i => count +=1; if(count==num) println(s"time 2 ($count): "+(System.currentTimeMillis() - now)); i }
  in2 ~> show ~> Sink.ignore
  ClosedShape
})
g.run()
Thread.sleep(2000)
Run Code Online (Sandbox Code Playgroud)

这个速度非常快,为742K /秒!所以Kafka raw非常快,Akka流很快.因此,罪魁祸首就在于如何构建反应性kafka(或者更有可能)我试图如何使用它.考虑到摩擦,我应该期待看到接近原始卡夫卡22K /秒的东西.嗯.

Von*_*onC 0

不是完整的答案,但请再试一次,但最近(2016 年 9 月)Akka Streams Kafka 0.11

\n
\n

有显着的性能改进。我们\xe2\x80\x99已经做了一些基准测试,虽然reactive-kafka包装器仍然有一些开销(但请记住,你也会得到一些回报:例如,所有的好处都来自于一个好的异步API背压!),总体数字看起来非常好并且正在变得更好。

\n

这里\xe2\x80\x99是一些常见的Kafka使用场景,比较了旧的reactive-kafka版本(M4)、当前版本(0.11)以及使用普通Kafka Producers/实现的等效功能Consumers(但当然没有考虑到,与任何其他反应组件连接)。

\n
\n

http://blog.akka.io/assets/kafka-bench1.png

\n

更多信息请参见Krzysiek Ciesielski的“ Benchmarking akka-stream-kafka ” 。

\n