Ale*_*der 15 java multithreading
我需要并行处理消息,但保留具有相同会话ID的消息的处理顺序.
示例:
让我们像这样定义一条消息:
class Message {
Message(long id, long conversationId, String someData) {...}
}
Run Code Online (Sandbox Code Playgroud)
假设消息按以下顺序到达:
消息(1,1,"a1"),消息(2,2,"a2"),消息(3,1,"b1"),消息(4,2,"b2) ").
我需要在消息1之后处理消息3,因为消息1和3具有相同的会话ID(类似地,消息4应该在2之后由于相同的原因处理).
我不关心例如1和2之间的相对顺序,因为它们具有不同的会话ID.
我想尽可能多地重用java ThreadPoolExecutor的功能,以避免在我的代码中手动替换死线程等.
更新:可能的'conversation-id'的数量不受限制,并且会话没有时间限制.(我个人认为这不是一个问题,因为我可以从conversationId到工人号码进行简单的映射,例如conversationId%totalWorkers).
更新2:具有多个队列的解决方案存在一个问题,其中队列号由例如' index = Objects.hash(conversationId)%total '确定:如果处理某些消息需要很长时间,则所有消息都包含相同的' index '但不同的' conversationId '将等待,即使其他线程可用来处理它.也就是说,我相信具有单个智能阻塞队列的解决方案会更好,但这只是一个意见,我对任何好的解决方案持开放态度.
你看到这个问题的优雅解决方案吗?
前段时间我不得不做一些非常相似的事情,所以这里有一个改编.
(在线观看)
它实际上是完全相同的基本需求,但在我的情况下,键是一个字符串,更重要的是键组没有无限增长,所以在这里我必须添加一个"清理调度程序".除此之外它基本上是相同的代码,所以我希望我在适应过程中没有丢失任何严肃的东西.我测试过,看起来很有效.然而,它比其他解决方案更长,可能更复杂......
基本理念:
MessageTask将消息包装到a中Runnable,并在完成时通知队列ConvoQueue:阻止消息队列,用于对话.充当预先确保所需顺序的预先队列.具体见这三人:ConvoQueue.runNextIfPossible()→交通MessageTask.run()→交通ConvoQueue.complete()→交通...MessageProcessor有一个Map<Long, ConvoQueue>,一个ExecutorServiceConvoQueues ExecutorService为每个convo 提供和保证消息顺序,但不是全局的(因此"困难"消息不会阻止其他会话被处理,与其他一些解决方案不同,并且该属性是关键的在我们的案例中很重要 - 如果它对你不重要,也许更简单的解决方案更好)ScheduledExecutorService(需要1个线程)视觉:
ConvoQueues ExecutorService's internal queue
(shared, but has at most 1 MessageTask per convo)
Convo 1 ########
Convo 2 #####
Convo 3 ####### Thread 1
Convo 4 } ? #### ? {
Convo 5 ### Thread 2
Convo 6 #########
Convo 7 #####
(Convo 4 is about to be deleted)
Run Code Online (Sandbox Code Playgroud)
在所有类下面(MessageProcessorTest可以直接执行):
// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static java.util.concurrent.TimeUnit.SECONDS;
public class MessageProcessor {
private static final long CLEANUP_PERIOD_S = 10;
private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
private final ExecutorService executorService;
public MessageProcessor(int nbThreads) {
executorService = Executors.newFixedThreadPool(nbThreads);
ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
}
public void addMessageToProcess(Message message) {
ConvoQueue queue = getQueue(message.getConversationId());
queue.addMessage(message);
}
private ConvoQueue getQueue(Long convoId) {
synchronized (queuesByConvo) {
return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
}
}
private void removeEmptyQueues() {
synchronized (queuesByConvo) {
queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
}
}
// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
class ConvoQueue {
private Queue<MessageTask> queue;
private MessageTask activeTask;
private ExecutorService executorService;
ConvoQueue(ExecutorService executorService) {
this.executorService = executorService;
this.queue = new LinkedBlockingQueue<>();
}
private void runNextIfPossible() {
synchronized(this) {
if (activeTask == null) {
activeTask = queue.poll();
if (activeTask != null) {
executorService.submit(activeTask);
}
}
}
}
void complete(MessageTask task) {
synchronized(this) {
if (task == activeTask) {
activeTask = null;
runNextIfPossible();
}
else {
throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
}
}
}
boolean isEmpty() {
return queue.isEmpty();
}
void addMessage(Message message) {
add(new MessageTask(this, message));
}
private void add(MessageTask task) {
synchronized(this) {
queue.add(task);
runNextIfPossible();
}
}
}
// MessageTask.java
public class MessageTask implements Runnable {
private ConvoQueue convoQueue;
private Message message;
MessageTask(ConvoQueue convoQueue, Message message) {
this.convoQueue = convoQueue;
this.message = message;
}
@Override
public void run() {
try {
processMessage();
}
finally {
convoQueue.complete(this);
}
}
private void processMessage() {
// Dummy processing with random delay to observe reordered messages & preserved convo order
try {
Thread.sleep((long) (50*Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(message);
}
}
// Message.java
class Message {
private long id;
private long conversationId;
private String data;
Message(long id, long conversationId, String someData) {
this.id = id;
this.conversationId = conversationId;
this.data = someData;
}
long getConversationId() {
return conversationId;
}
String getData() {
return data;
}
public String toString() {
return "Message{" + id + "," + conversationId + "," + data + "}";
}
}
// MessageProcessorTest.java
public class MessageProcessorTest {
public static void main(String[] args) {
MessageProcessor test = new MessageProcessor(2);
for (int i=1; i<100; i++) {
test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
}
}
}
Run Code Online (Sandbox Code Playgroud)
输出(保留每个convo ID(第2个字段)顺序):
Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}
Run Code Online (Sandbox Code Playgroud)
上面的测试让我有信心分享它,但我有点担心我可能忘记了病理案例的细节.它已经在生产中运行多年没有故障(虽然有更多的代码允许在我们需要查看正在发生的事情时检查它,为什么某个队列需要时间等等 - 从来没有问题上面的系统本身,但有时处理特定任务)
编辑:点击此处在线测试.替代方案:复制是依据在那里,然后按"编译和执行".
不确定您希望如何处理消息。为了方便起见,每个消息都是 Runnable 类型,这是执行发生的地方。
所有这些问题的解决方案是让多个Executor提交并行ExecutorService。使用模运算来计算Executor传入消息需要分发到哪个。显然,对于相同的对话 id 来说,它是相同的Executor,因此您可以并行处理,但对于相同的对话 id 是顺序处理的。不能保证具有不同会话 ID 的消息始终并行执行(总而言之,您至少受到系统中物理核心数量的限制)。
public class MessageExecutor {
public interface Message extends Runnable {
long getId();
long getConversationId();
String getMessage();
}
private static class Executor implements Runnable {
private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
private volatile boolean stopped;
void schedule(Message message) {
messages.add(message);
}
void stop() {
stopped = true;
}
@Override
public void run() {
while (!stopped) {
try {
Message message = messages.take();
message.run();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
}
}
private final Executor[] executors;
private final ExecutorService executorService;
public MessageExecutor(int poolCount) {
executorService = Executors.newFixedThreadPool(poolCount);
executors = new Executor[poolCount];
IntStream.range(0, poolCount).forEach(i -> {
Executor executor = new Executor();
executorService.submit(executor);
executors[i] = executor;
});
}
public void submit(Message message) {
final int executorNr = Objects.hash(message.getConversationId()) % executors.length;
executors[executorNr].schedule(message);
}
public void stop() {
Arrays.stream(executors).forEach(Executor::stop);
executorService.shutdown();
}
}
Run Code Online (Sandbox Code Playgroud)
然后,您可以使用池数量启动消息执行器并向其提交消息。
public static void main(String[] args) {
MessageExecutor messageExecutor = new MessageExecutor(Runtime.getRuntime().availableProcessors());
messageExecutor.submit(new Message() {
@Override
public long getId() {
return 1;
}
@Override
public long getConversationId() {
return 1;
}
@Override
public String getMessage() {
return "abc1";
}
@Override
public void run() {
System.out.println(this.getMessage());
}
});
messageExecutor.submit(new Message() {
@Override
public long getId() {
return 1;
}
@Override
public long getConversationId() {
return 2;
}
@Override
public String getMessage() {
return "abc2";
}
@Override
public void run() {
System.out.println(this.getMessage());
}
});
messageExecutor.stop();
}
Run Code Online (Sandbox Code Playgroud)
当我以 2 的池计数运行并提交一定数量的消息时:
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Run Code Online (Sandbox Code Playgroud)
当相同数量的消息在池计数为 3 的情况下运行时:
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Run Code Online (Sandbox Code Playgroud)
消息在 的池中得到很好的分发Executor:)。
编辑: ' Executorsrun()正在捕获所有异常,以确保当一条消息失败时它不会中断。