Java:具有多个队列的Executor服务

nir*_*raj 7 java multithreading executorservice

需求:

  1. 我将消息分组为不同类型,例如Type1, Type2 ... Type100.
  2. 我想并行执行不同类型的消息.让我们说10个线程,但所有相同类型的消息必须逐个执行.执行顺序无关紧要.
  3. 一旦一个线程完成了所有的消息TypeX.它应该开始处理另一个Type.

我经历了不同的答案:他们中的大多数建议执行器服务来处理多线程.假设我们创建执行服务

ExecutorService executorService = Executors.newFixedThreadPool(10);
Run Code Online (Sandbox Code Playgroud)

但是一旦我们使用了提交消息 executorService.submit(runnableMessage);

我们无法控制仅将特定类型的消息分配给特定线程.

解:

创建单线程执行程序数组

ExecutorService[] pools = new ExecutorService[10];
Run Code Online (Sandbox Code Playgroud)

并且最初传递Type1,Type2 ... Type10的消息,然后如果任何执行程序已完成执行,则将Type11分配给它并继续执行直到所有类型都被处理.

有没有更好的方法呢?

类似于具有多个队列的执行器服务,我可以将每种类型的消息推送到不同的队列?

Chr*_*s K 6

我建议你看一下Akka.它们提供了一个更适合这个用例的Actor框架.如果没有定义自己的ExecutorService接口实现,那么JDK提供的默认实现并没有给出对调度有多大控制权.

创建ExecutionServices的硬编码数组不会非常动态或强大,特别是每个ExecutionService会有一个线程池.可以使用哈希映射替换该数组,然后将其置于ExecutionService的自定义实现之后,这将具有从调用者隐藏这些细节的优点,但它不会解决具有如此多线程池的线程浪费.

在Akka中,每个Actor都有自己的消息队列.每个Actor都有效地在自己的线程中运行,从队列中一次处理一条消息.Akka将管理多个Actors之间的线程共享.因此,如果您要为每种消息类型创建一个Actor,然后使用这些Actors排队消息,那么您将获得目标,即每次最多一个线程处理每个消息类型,同时仅由一个池支持线程.

演示技巧:

Maven对Akka的依赖.

    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-actor_2.11</artifactId>
        <version>2.4.17</version>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

Java 8代码.复制并粘贴到Java文件中,然后在IDE中运行main方法.

package com.softwaremosaic.demos.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;


public class ActorDemo {

    public static void main( String[] args ) throws InterruptedException {
        // The following partitioner will spread the requests over
        // multiple actors, which I chose to demonstrate the technique.
        // You will need to change it to one that better maps the the
        // jobs to your use case.   Remember that jobs that get mapped
        // to the same key, will get executed in serial (probably
        // but not necessarily) by the same thread.
        ExecutorService exectorService = new ActorExecutionService( job -> job.hashCode()+"" );

        for ( int i=0; i<100; i++ ) {
            int id = i;
            exectorService.submit( () -> System.out.println("JOB " + id) );
        }

        exectorService.shutdown();
        exectorService.awaitTermination( 1, TimeUnit.MINUTES );

        System.out.println( "DONE" );
    }

}


class ActorExecutionService extends AbstractExecutorService {

    private final ActorSystem                              actorSystem;
    private final Function<Runnable, String>               partitioner;
    private final ConcurrentHashMap<String,ActorRef>       actors = new ConcurrentHashMap<>();

    public ActorExecutionService( Function<Runnable,String> partitioner ) {
        this.actorSystem = ActorSystem.create("demo");
        this.partitioner = partitioner;
    }


    public void execute( Runnable command ) {
        String partitionKey = partitioner.apply( command );

        ActorRef actorRef = actors.computeIfAbsent( partitionKey, this::createNewActor );

        actorRef.tell( command, actorRef );
    }

    private ActorRef createNewActor( String partitionKey ) {
        return actorSystem.actorOf( Props.create(ExecutionServiceActor.class), partitionKey );
    }


    public void shutdown() {
        actorSystem.terminate();
    }

    public List<Runnable> shutdownNow() {
        actorSystem.terminate();

        try {
            awaitTermination( 1, TimeUnit.MINUTES );
        } catch ( InterruptedException e ) {
            throw new RuntimeException( e );
        }

        return Collections.emptyList();
    }

    public boolean isShutdown() {
        return actorSystem.isTerminated();
    }

    public boolean isTerminated() {
        return actorSystem.isTerminated();
    }

    public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
        actorSystem.awaitTermination();

        return actorSystem.isTerminated();
    }
}

 class ExecutionServiceActor extends UntypedActor {
    public void onReceive(Object message) throws Exception {
        if (message instanceof Runnable) {
            ((Runnable) message).run();
        } else {
            unhandled(message);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

注意上面的代码将以未定义的顺序打印1-100.由于批处理(Akka确实可以获得额外的性能优势),订单看起来大多是串行的.但是,当不同的线程散布在工作中时,您会看到数字的随机性.每个作业运行的时间越长,分配给Akka线程池的线程越多,使用的分区键越多,底层CPU核心越多,序列可能变得越随机.


nir*_*raj 2

一个更简单的解决方案可能是:

而不是让每条消息都可运行。我们可以根据消息类型创建群组消息:

例如,我们为type1的所有消息创建Group1

class MessageGroup implements Runnable {
    String type;
    String List<Message> messageList;

    @Override
    public void run() {
      for(Message message : MessageList) {
         message.process();
      }
    }
} 
Run Code Online (Sandbox Code Playgroud)

我们可以使用固定线程创建常用的执行器服务,例如

ExecutorService executorService = Executors.newFixedThreadPool(10); 
Run Code Online (Sandbox Code Playgroud)

我们可以提交一组消息,而不是提交单个消息,例如

executorService.submit(runnableGroup);
Run Code Online (Sandbox Code Playgroud)

每个组都会在同一个线程中顺序执行相同类型的消息。