如何在 AKKA Actor 中保持线程安全?

Yul*_*lin 0 java concurrency multithreading thread-safety akka

我的项目需要大量的异步编程,所以我选择AKKA平台,因为使用actor模型可以像编写同步代码一样实现异步系统,而不必担心线程问题。一切正常,直到我遇到以下问题(演示代码):

import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import java.util.concurrent.locks.ReentrantLock;

public class TestActor extends AbstractActor {
    private final ReentrantLock lock  = new ReentrantLock();

    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                .matchEquals("lock", s -> lock.lock())
                .matchEquals("unlock", s -> lock.unlock())
                .build();
    }
}
Run Code Online (Sandbox Code Playgroud)

首先发送“锁定”消息,然后发送“解锁”消息,在收到发送消息后尝试解锁时,抛出了一个IllegalMonitorStateException,我发现这是由于不同的消息实际上由不同的线程处理,s -> lock.lock()并且s -> lock.unlock()是在不同的线程中执行,因此IllegalMonitorStateException抛出。

我之前的假设是,演员的所有操作都在一个线程中执行,因此它是完全线程安全的,不必担心线程问题。由于我在项目中广泛使用 AKKA,现在我非常担心并且不清楚第一剂在使用 AKKA 时何时需要考虑线程问题。例如,在以下演示代码中:

public class TestActor1 extends AbstractActor {
    private int count  = 0;
    private Map<Integer, Integer> map = new HashMap<>();
    
    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                .matchEquals("action1", s -> count++)
                .matchEquals("action2", s -> getSender().tell(count, getSelf()))
                .matchEquals("action3", s -> map.put(3, 2))
                .matchEquals("action4", s -> getSender().tell(map.get(3), getSelf()))
                .build();
    }
}
Run Code Online (Sandbox Code Playgroud)

方法countmap使用的线程安全吗?我需要使用volatileforcount和 use ConcurrentHashMapfor吗map

ps======================

下面的演示代码演示了为什么我需要锁定actor,基本上我正在实现一个具有背压控制的管道,一旦actor从上游actor接收到太多任务,它就会backPressureHi向上游actor发送一条消息以停止上游actor执行循环直到背压恢复正常并发送backPressureNormal恢复:

public class PipeLineActor extends AbstractActor {
    private final ReentrantLock stallLock = new ReentrantLock();

    private Thread executionLoop = new Thread(() -> {
        while (true){
            stallLock.lock();
            stallLock.unlock();
            
            // issue tasks to down stream actors
        }
    });

    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                // down stream actor send "backPressureHi" when back pressure is high to stall the executionLoop
                .matchEquals("backPressureHi", s -> stallLock.lock())
                // down stream actor send "backPressureNormal" when back pressure resumed normal to resume the executionLoop
                .matchEquals("backPressureNormal", s -> stallLock.unlock())
                .build();
    }
}
Run Code Online (Sandbox Code Playgroud)

Iva*_*iuc 5

Akka 被设计为线程安全的。并且 Actor 内永远不需要锁定或同步。不应该这样做。

\n

Akka 通过一次处理一条消息来实现线程安全。一个参与者不能同时处理多个消息。但消息可能并且将会在不同的线程中处理。(这是默认行为,但可以使用 pin 调度程序进行更改)。

\n

来自文档

\n
\n

由于参与者实例一次处理一条消息,因此不需要同步保护或 AtomicInteger 等并发保护。

\n
\n

对于你的最后一个问题,

\n
\n

count 和 map 的使用方式是线程安全的吗?

\n
\n

是的,它是线程安全的。

\n
\n

我是否需要使用 volatile 进行计数并使用 ConcurrentHashMap 进行映射?

\n
\n

不,没有必要这样做。请参阅Akka 和 Java 内存模型

\n
\n

用外行人的术语来说,这意味着当该 actor 处理下一条消息时,该 actor 的内部字段的更改是可见的。所以\n你的actor中的字段不需要是易失的或等效的。

\n
\n