当我们尝试从参与者的接收方法中启动多个期货时,我们观察到一种奇怪的行为。如果我们将配置的调度程序用作ExecutionContext,则期货将在同一线程上按顺序运行。如果我们使用ExecutionContext.Implicits.global,则期货将按预期并行运行。
我们将代码简化为以下示例(下面是一个更完整的示例):
implicit val ec = context.getDispatcher
Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }
Future {
Future{ doWork() }
Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
Future{ doWork() }
Future{ doWork() }
}
Run Code Online (Sandbox Code Playgroud)
一个可编译的示例如下:
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
object WhyNotParallelExperiment extends App {
val actorSystem = ActorSystem(s"Experimental")
// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)
// Futures started in future: running in …Run Code Online (Sandbox Code Playgroud) 我正在运行Akka Streams Reactive Kafka应用程序,该应用程序应该在高负载下运行.运行应用程序大约10分钟后,应用程序关闭了OutOfMemoryError.我试图调试堆转储,发现它akka.dispatch.Dispatcher占用了大约5GB的内存.以下是我的配置文件.
Akka版本:2.4.18
Reactive Kafka版本:2.4.18
1 application.conf.:
consumer {
num-consumers = "2"
c1 {
bootstrap-servers = "localhost:9092"
bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1}
groupId = "testakkagroup1"
subscription-topic = "test"
subscription-topic=${?SUBSCRIPTION_TOPIC1}
message-type = "UserEventMessage"
poll-interval = 100ms
poll-timeout = 50ms
stop-timeout = 30s
close-timeout = 20s
commit-timeout = 15s
wakeup-timeout = 10s
use-dispatcher = "akka.kafka.default-dispatcher"
kafka-clients {
enable.auto.commit = true
}
}
Run Code Online (Sandbox Code Playgroud)
2 . build.sbt:
java -Xmx6g \
-Dcom.sun.management.jmxremote.port=27019 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=localhost \
-Dzookeeper.host=$ZK_HOST \ …Run Code Online (Sandbox Code Playgroud)