sut*_*lui 6 synchronization actor akka
这是我的场景:
我有一个主演员,它接收来自多个子演员的消息.这些消息包含要聚合的数据.在这个聚合逻辑中,如果我使用共享数据结构来收集聚合,是否需要处理同步问题?
else if(arg0 instanceof ReducedMsg){
ReducedMsg reduced = (ReducedMsg)arg0;
counter.decrementAndGet();
synchronized(finalResult){
finalResult.add((KeyValue<K, V>) reduced.getReduced());
if(counter.get() == 0){
if(checkAndReduce(finalResult)){
finalResult.clear();
}
else{
stop();
latch.countDown();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
因此,您可以看到我有一个finalResult,每个消息将被聚合到这个,并且在处理逻辑之后,集合也需要被清除.
实际上我想要实现的是递归(关联)缩减mapreduce.所以我需要保持我假设的同步块?或者Akka是否一次执行onReceive一个线程?
该逻辑在小数据集上产生准确且可预测的结果.我的问题是当我的输入数据集有点大时,代码挂起.我想确定这是因为同步块的上下文切换,所以我可能会遇到不同的设计.
Tom*_*icz 18
onReceive()是从来没有同时调用.这是Akka给您的最基本保证.
这意味着如果您的counter变量是actor中的一个字段而没有其他代码可以直接访问该字段,则可以安全地使用normal int/ long而不是AtomicInteger/ AtomicLong.finalResult假设它是封装并隐藏在actor中的字段,也不需要同步.
最后使用CountDownLatch是可疑的.在Akka应用程序中,您不应使用任何同步原语.参与者本质上是单线程的,所有通信(包括唤醒和传递数据)都应该通过消息传递来实现.
这些都在文档中解释:http://doc.akka.io/docs/akka/2.0.2/general/jmm.html#Actors_and_the_Java_Memory_Model