Akka模型用于计算任务

use*_*mda 7 java blocking akka

我有以下要求

  • 使用用户名和密码连接到Web服务器并获取验证令牌
  • 读取文件以获取不同的参数
  • 使用步骤1中的身份验证令牌和步骤2中的参数向Web服务器发送http请求

现在我有一个演员执行上述所有任务,如下所示

package akka.first.java;

import akka.actor.UntypedActor;

public class MySingleActor extends UntypedActor {

    public void onReceive(Object msg) {

        if( msg instanceof sendRequest ) {

            //Connect to a webserver with a username and password and get an authetication token
            String token = getToken();
           // Read file to get different parameters
            Param param = readFile();
           // Use the auth token fro step 1 and parameters from step 2 to send an http request to the web server
            Response response = sendRequest (server, token, param);


        }

    }

    private Param readFile() {
        // reads file 
    }

    private String getToken() {
        //gets token 
    }
}
Run Code Online (Sandbox Code Playgroud)

readFile操作包含各种子任务,我认为它应该是一个单独的actor.但是,由于演员执行发送请求的主要任务需要从readFile()操作返回,这可能是阻塞,根据文档不推荐,最好的方法是什么?期货?

Ole*_*tov 5

官方文档提供以下解决方案

  • 在一个actor(或由路由器[Java,Scala]管理的一组actor)中进行阻塞调用,确保配置一个专用于此目的或足够大小的线程池.
  • 在Future中进行阻塞调用,确保在任何时间点对此类调用的数量进行上限(提交此类无限数量的任务将耗尽您的内存或线程限制).
  • 在Future中执行阻塞调用,提供一个线程池,其中包含适用于运行应用程序的硬件的线程数上限.
  • 奉献一个线程来管理一组阻塞资源(如NIO选择驱动多个频道),因为它们发生担任男主角的消息和调度的事件.

使用期货是官方建议的方法之一,但要特别小心.

让我们考虑第一种方法,因为IMO更加一致.

首先,将所有阻塞IO操作提取到仅执行一个阻塞IO操作的新actor中.假设为简洁起见只有一个这样的操作:

public class MyBlockingIOActor extends UntypedActor {
    public void onReceive(Object msg) {
        // do blocking IO call here and send the result back to sender
    }
}
Run Code Online (Sandbox Code Playgroud)

actor系统配置文件中添加用于调度程序的配置,该配置将负责阻塞actor (通常):application.conf

#Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO
blocking-io-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 32
  }
  throughput = 1
}
Run Code Online (Sandbox Code Playgroud)

请确保在创建actor系统时使用配置文件(特别是如果您决定使用非标准文件名进行配置):

ActorSystem actorSystem = ActorSystem.create("my-actor-system", ConfigFactory.load("application.conf"));
Run Code Online (Sandbox Code Playgroud)

之后,您希望将执行阻塞IO的actor分配给专用调度程序.您可以在此处所述的配置中或创建actor时执行此操作:

ActorRef blockingActor = context().actorOf(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher"));
Run Code Online (Sandbox Code Playgroud)

为了获得更多吞吐量,请考虑将阻塞actor包装到池中:

SupervisorStrategy strategy = new OneForOneStrategy(
        5,
        Duration.create(1, TimeUnit.MINUTES),
        Collections.singletonList(Exception.class)
);
ActorRef blockingActor = context().actorOf(new SmallestMailboxPool(5).withSupervisorStrategy(strategy).props(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher")));
Run Code Online (Sandbox Code Playgroud)

您可以通过以下方式确保actor使用正确的调度程序:

public class MyBlockingIOActor extends UntypedActor {
    public void preStart() {
        LOGGER.debug("using dispatcher: {}", ((Dispatcher)context().dispatcher()).id());
    }
}
Run Code Online (Sandbox Code Playgroud)


gas*_*ton 2

您可以使用 Futures,或者使用带有 Observables 和 Observers 的 RxJava。或者不同的参与者并将最终响应转发给原始发件人

  public class MySingleActor extends UntypedActor{

private ActorRef tokenActor;
private ActorRef readFileActor;

public MySingleActor(){
    tokenActor = context().actorOf(Props.create(TokenActor.class),"tokenActor");
    readFileActor = context().actorOf(Props.create(ReadFileActor.class),"readFileActor");
}
public void onReceive(Object msg) {
    if( msg instanceof sendRequest ) {
        Future<String> f= Futures.future(new Callable<String>() {
            @Override public String call() throws Exception {
                return getToken();
            }            },context().dispatcher());Patterns.pipe(f,context().dispatcher()).to(tokenActor).pipeTo(readFileActor,self());
    }       
}}
Run Code Online (Sandbox Code Playgroud)

或者代替管道

f.onComplete(new OnComplete<String>(){ public void onComplete(Throwable t, String result){ readFileActor.tell(result,self()); } }, context().system().dispatcher());