Mr.*_*ndy 6 java asynchronous message-passing producer-consumer
我最初在这里问过这个问题,但我意识到我的问题不是关于一个真正的循环.我想知道的是,在Java中进行高性能异步消息传递的正确方法是什么?
我想做什么......
我有大约10,000名消费者,每个消费者都从他们的私人队列中消费消息.我有一个线程一个接一个地生成消息并将它们放在正确的消费者队列中.每个消费者无限循环,检查消息是否出现在队列中并进行处理.
我认为这个术语是"单一生产者/单一消费者",因为有一个生产者,每个消费者只能在他们的私人队列上工作(多个消费者从不从同一个队列中读取).
在Consumer.java里面:
@Override
public void run() {
while (true) {
Message msg = messageQueue.poll();
if (msg != null) {
... // do something with the message
}
}
}
Run Code Online (Sandbox Code Playgroud)
Producer正在快速地将消息放入消费者消息队列中(每秒数百万条消息).消费者应该尽快处理这些消息!
注意:while (true) { ... }由Producer作为最后一条消息发送的KILL消息终止.
但是,我的问题是关于设计此消息传递的正确方法.我应该为messageQueue使用什么样的队列?应该是同步还是异步?如何设计消息?我应该使用while-true循环吗?消费者应该是一个线程,还是其他什么?10,000个线程会慢慢爬行吗?什么是线程的替代品?
那么,在Java中进行高性能消息传递的正确方法是什么?
我会说10,000个线程的上下文切换开销会非常高,更不用说内存开销了.默认情况下,在32位平台上,每个线程使用256kb的默认堆栈大小,因此只有堆栈的2.5GB.显然你说的是64位,但即便如此,那相当大的内存.由于使用的内存量很大,缓存将会颠覆很多,而cpu将受到内存带宽的限制.
我会寻找一种设计,避免使用如此多的线程来避免分配大量的堆栈和上下文切换开销.您不能同时处理10,000个线程.当前的硬件通常少于100个核心.
我会为每个硬件线程创建一个队列并以循环方式分发消息.如果处理时间变化很大,则存在一些线程在给予更多工作之前完成处理队列的危险,而其他线程从未完成其分配的工作.这可以通过使用JSR-166 ForkJoin框架中实现的工作窃取来避免.
由于通信是从发布者到订阅者的一种方式,因此Message不需要任何特殊设计,假设订阅者在发布后不更改消息.
编辑:读取注释,如果你有10,000个符号,那么创建一些通用订阅者线程(每个核心一个订阅者线程),异步收到来自发布者的消息(例如通过他们的消息队列).订阅者从队列中提取消息,从消息中检索符号,并在消息处理程序的Map中查找它,检索处理程序,并调用处理程序以同步处理消息.完成后,它会重复,从队列中获取下一条消息.如果必须按顺序处理相同符号的消息(这就是为什么我猜你想要10,000个队列.),你需要将符号映射到订阅者.例如,如果有10个订户,则符号0-999转到订户0,1000-1999转到订户1等.更精细的方案是根据频率分布映射符号,以便每个用户获得大致相同的负载.例如,如果10%的流量是符号0,则订户0将仅处理该一个符号,而其他符号将在其他订户之间分配.