标签: akka-dispatcher

在Akka Dispatcher上启动后,为什么Futures中的Futures按顺序运行

当我们尝试从参与者的接收方法中启动多个期货时,我们观察到一种奇怪的行为。如果我们将配置的调度程序用作ExecutionContext,则期货将在同一线程上按顺序运行。如果我们使用ExecutionContext.Implicits.global,则期货将按预期并行运行。

我们将代码简化为以下示例(下面是一个更完整的示例):

implicit val ec = context.getDispatcher

Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }

Future {
   Future{ doWork() } 
   Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
   Future{ doWork() }
   Future{ doWork() }
}
Run Code Online (Sandbox Code Playgroud)

一个可编译的示例如下:

import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")   

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in …
Run Code Online (Sandbox Code Playgroud)

parallel-processing scala future akka akka-dispatcher

6
推荐指数
1
解决办法
361
查看次数

Akka Streams Reactive Kafka - 高负荷下的OutOfMemoryError

我正在运行Akka Streams Reactive Kafka应用程序,该应用程序应该在高负载下运行.运行应用程序大约10分钟后,应用程序关闭了OutOfMemoryError.我试图调试堆转储,发现它akka.dispatch.Dispatcher占用了大约5GB的内存.以下是我的配置文件.

Akka版本:2.4.18

Reactive Kafka版本:2.4.18

1 application.conf.:

consumer {
num-consumers = "2"
c1 {
  bootstrap-servers = "localhost:9092"
  bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1}
  groupId = "testakkagroup1"
  subscription-topic = "test"
  subscription-topic=${?SUBSCRIPTION_TOPIC1}
  message-type = "UserEventMessage"
  poll-interval = 100ms
  poll-timeout = 50ms
  stop-timeout = 30s
  close-timeout = 20s
  commit-timeout = 15s
  wakeup-timeout = 10s
  use-dispatcher = "akka.kafka.default-dispatcher"
  kafka-clients {
    enable.auto.commit = true
  }
}  
Run Code Online (Sandbox Code Playgroud)

2 . build.sbt:

java -Xmx6g \
-Dcom.sun.management.jmxremote.port=27019 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=localhost \
-Dzookeeper.host=$ZK_HOST \ …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream akka-dispatcher reactive-kafka

3
推荐指数
1
解决办法
287
查看次数