线程池并行处理消息,但保留对话中的顺序

Ale*_*der 15 java multithreading

我需要并行处理消息,但保留具有相同会话ID的消息的处理顺序.

示例:
让我们像这样定义一条消息:

class Message {
    Message(long id, long conversationId, String someData) {...}
}
Run Code Online (Sandbox Code Playgroud)

假设消息按以下顺序到达:
消息(1,1,"a1"),消息(2,2,"a2"),消息(3,1,"b1"),消息(4,2,"b2) ").

我需要在消息1之后处理消息3,因为消息1和3具有相同的会话ID(类似地,消息4应该在2之后由于相同的原因处理).
我不关心例如1和2之间的相对顺序,因为它们具有不同的会话ID.

我想尽可能多地重用java ThreadPoolExecutor的功能,以避免在我的代码中手动替换死线程等.

更新:可能的'conversation-id'的数量不受限制,并且会话没有时间限制.(我个人认为这不是一个问题,因为我可以从conversationId到工人号码进行简单的映射,例如conversationId%totalWorkers).

更新2:具有多个队列的解决方案存在一个问题,其中队列号由例如' index = Objects.hash(conversationId)%total '确定:如果处理某些消息需要很长时间,则所有消息都包含相同的' index '但不同的' conversationId '将等待,即使其他线程可用来处理它.也就是说,我相信具有单个智能阻塞队列的解决方案会更好,但这只是一个意见,我对任何好的解决方案持开放态度.

你看到这个问题的优雅解决方案吗?

Hug*_* M. 8

前段时间我不得不做一些非常相似的事情,所以这里有一个改编.

(在线观看)

它实际上是完全相同的基本需求,但在我的情况下,键是一个字符串,更重要的是键组没有无限增长,所以在这里我必须添加一个"清理调度程序".除此之外它基本上是相同的代码,所以我希望我在适应过程中没有丢失任何严肃的东西.我测试过,看起来很有效.然而,它比其他解决方案更长,可能更复杂......

基本理念:

  • MessageTask将消息包装到a中Runnable,并在完成时通知队列
  • ConvoQueue:阻止消息队列,用于对话.充当预先确保所需顺序的预先队列.具体见这三人:ConvoQueue.runNextIfPossible()→交通MessageTask.run()→交通ConvoQueue.complete()→交通...
  • MessageProcessor有一个Map<Long, ConvoQueue>,一个ExecutorService
  • 消息由执行程序中的任何线程处理,ConvoQueues ExecutorService为每个convo 提供和保证消息顺序,但不是全局的(因此"困难"消息不会阻止其他会话被处理,与其他一些解决方案不同,并且该属性是关键的在我们的案例中很重要 - 如果它对你不重要,也许更简单的解决方案更好)
  • 清理ScheduledExecutorService(需要1个线程)

视觉:

   ConvoQueues              ExecutorService's internal queue
                            (shared, but has at most 1 MessageTask per convo)
Convo 1   ########   
Convo 2      #####   
Convo 3    #######                        Thread 1
Convo 4              } ?    ####    ? {
Convo 5        ###                        Thread 2
Convo 6  #########   
Convo 7      #####   

(Convo 4 is about to be deleted)
Run Code Online (Sandbox Code Playgroud)

在所有类下面(MessageProcessorTest可以直接执行):

// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

public class MessageProcessor {

    private static final long CLEANUP_PERIOD_S = 10;
    private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
    private final ExecutorService executorService;

    public MessageProcessor(int nbThreads) {
        executorService = Executors.newFixedThreadPool(nbThreads);
        ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
        cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
    }

    public void addMessageToProcess(Message message) {
        ConvoQueue queue = getQueue(message.getConversationId());
        queue.addMessage(message);
    }

    private ConvoQueue getQueue(Long convoId) {
        synchronized (queuesByConvo) {
            return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
        }
    }

    private void removeEmptyQueues() {
        synchronized (queuesByConvo) {
            queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
        }
    }

}


// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

class ConvoQueue {

    private Queue<MessageTask> queue;
    private MessageTask activeTask;
    private ExecutorService executorService;

    ConvoQueue(ExecutorService executorService) {
        this.executorService = executorService;
        this.queue = new LinkedBlockingQueue<>();
    }

    private void runNextIfPossible() {
        synchronized(this) {
            if (activeTask == null) {
                activeTask = queue.poll();
                if (activeTask != null) {
                    executorService.submit(activeTask);
                }
            }
        }
    }

    void complete(MessageTask task) {
        synchronized(this) {
            if (task == activeTask) {
                activeTask = null;
                runNextIfPossible();
            }
            else {
                throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
            }
        }
    }

    boolean isEmpty() {
        return queue.isEmpty();
    }

    void addMessage(Message message) {
        add(new MessageTask(this, message));
    }

    private void add(MessageTask task) {
        synchronized(this) {
            queue.add(task);
            runNextIfPossible();
        }
    }

}

// MessageTask.java
public class MessageTask implements Runnable {

    private ConvoQueue convoQueue;
    private Message message;

    MessageTask(ConvoQueue convoQueue, Message message) {
        this.convoQueue = convoQueue;
        this.message = message;
    }

    @Override
    public void run() {
        try {
            processMessage();
        }
        finally {
            convoQueue.complete(this);
        }
    }

    private void processMessage() {
        // Dummy processing with random delay to observe reordered messages & preserved convo order
        try {
            Thread.sleep((long) (50*Math.random()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(message);
    }

}

// Message.java
class Message {

    private long id;
    private long conversationId;
    private String data;

    Message(long id, long conversationId, String someData) {
        this.id = id;
        this.conversationId = conversationId;
        this.data = someData;
    }

    long getConversationId() {
        return conversationId;
    }

    String getData() {
        return data;
    }

    public String toString() {
        return "Message{" + id + "," + conversationId + "," + data + "}";
    }
}

// MessageProcessorTest.java
public class MessageProcessorTest {
    public static void main(String[] args) {
        MessageProcessor test = new MessageProcessor(2);
        for (int i=1; i<100; i++) {
            test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

输出(保留每个convo ID(第2个字段)顺序):

Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}
Run Code Online (Sandbox Code Playgroud)

上面的测试让我有信心分享它,但我有点担心我可能忘记了病理案例的细节.它已经在生产中运行多年没有故障(虽然有更多的代码允许在我们需要查看正在发生的事情时检查它,为什么某个队列需要时间等等 - 从来没有问题上面的系统本身,但有时处理特定任务)

编辑:点击此处在线测试.替代方案:复制是依据那里,然后按"编译和执行".


Vel*_*lth 4

不确定您希望如何处理消息。为了方便起见,每个消息都是 Runnable 类型,这是执行发生的地方。

所有这些问题的解决方案是让多个Executor提交并行ExecutorService。使用模运算来计算Executor传入消息需要分发到哪个。显然,对于相同的对话 id 来说,它是相同的Executor,因此您可以并行处理,但对于相同的对话 id 是顺序处理的。不能保证具有不同会话 ID 的消息始终并行执行(总而言之,您至少受到系统中物理核心数量的限制)。

public class MessageExecutor {

    public interface Message extends Runnable {

        long getId();

        long getConversationId();

        String getMessage();

    }

    private static class Executor implements Runnable {

        private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Message message) {
            messages.add(message);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Message message = messages.take();
                    message.run();
                } catch (Exception e) {
                    System.err.println(e.getMessage());
                }
            }
        }
    }

    private final Executor[] executors;
    private final ExecutorService executorService;

    public MessageExecutor(int poolCount) {
        executorService = Executors.newFixedThreadPool(poolCount);
        executors = new Executor[poolCount];

        IntStream.range(0, poolCount).forEach(i -> {
            Executor executor = new Executor();
            executorService.submit(executor);
            executors[i] = executor;
        });
    }

    public void submit(Message message) {
        final int executorNr = Objects.hash(message.getConversationId()) % executors.length;
        executors[executorNr].schedule(message);
    }

    public void stop() {
        Arrays.stream(executors).forEach(Executor::stop);
        executorService.shutdown();
    }
}
Run Code Online (Sandbox Code Playgroud)

然后,您可以使用池数量启动消息执行器并向其提交消息。

public static void main(String[] args) {
    MessageExecutor messageExecutor = new MessageExecutor(Runtime.getRuntime().availableProcessors());
    messageExecutor.submit(new Message() {
        @Override
        public long getId() {
            return 1;
        }

        @Override
        public long getConversationId() {
            return 1;
        }

        @Override
        public String getMessage() {
            return "abc1";
        }

        @Override
        public void run() {
            System.out.println(this.getMessage());
        }
    });
    messageExecutor.submit(new Message() {
        @Override
        public long getId() {
            return 1;
        }

        @Override
        public long getConversationId() {
            return 2;
        }

        @Override
        public String getMessage() {
            return "abc2";
        }

        @Override
        public void run() {
            System.out.println(this.getMessage());
        }
    });
    messageExecutor.stop();
}
Run Code Online (Sandbox Code Playgroud)

当我以 2 的池计数运行并提交一定数量的消息时:

Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Run Code Online (Sandbox Code Playgroud)

当相同数量的消息在池计数为 3 的情况下运行时:

Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Run Code Online (Sandbox Code Playgroud)

消息在 的池中得到很好的分发Executor:)。

编辑: ' Executorsrun()正在捕获所有异常,以确保当一条消息失败时它不会中断。

  • @Alexander我认为你误解了“ExecutorService”的保证。如果“Callable”因异常而终止,则不能保证它会被重新安排。然而,它确实管理线程并将“Callable”复用到它们上,如果线程以某种方式死亡,则有可能重新调度线程。如果您需要连续运行的计算“Callable”/“Runnable”,则必须捕获所有异常,最好是“Throwable”。 (2认同)