56 java messaging multithreading channel rabbitmq
我刚刚阅读了RabbitMQ的Java API文档,发现它非常丰富且直截了当.如何设置简单Channel
的发布/消费的示例非常容易理解.但这是一个非常简单/基本的例子,它给我留下了一个重要的问题:如何设置1+ Channels
来发布/消费多个队列?
比方说,我有上有3个队列中的RabbitMQ服务器:logging
,security_events
和customer_orders
.因此,我们要么需要一个Channel
能够发布/消费所有3个队列,或者更有可能需要3个独立的Channels
队列,每个队列都专用于一个队列.
除此之外,RabbitMQ的最佳实践要求我们Channel
为每个消费者线程设置1 个.对于这个例子,让我们说security_events
是罚款,只有1消费者线程,但logging
并customer_order
都需要5个线程来处理卷.所以,如果我理解正确,这是否意味着我们需要:
Channel
用于发布/消费的1 和1个消费者线程security_events
; 和Channels
用于发布/消费的5 和5个消费者线程logging
; 和Channels
用于发布/消费的5 和5个消费者线程customer_orders
?如果我的理解在这里被误导,请先纠正我.无论哪种方式,一些厌倦战斗的RabbitMQ老手能帮助我"连接点"和一个体面的代码示例来设置符合我要求的发布者/消费者吗?提前致谢!
Ren*_*nov 125
我认为你有几个初步了解的问题.坦率地说,我对以下内容感到有点惊讶:both need 5 threads to handle the volume
.你怎么认出你需要那个确切的数字?你有什么保证5线程就够了吗?
RabbitMQ经过调整和时间测试,所以它都是关于正确的设计和有效的消息处理.
让我们试着回顾一下问题并找到合适的解决方案.顺便说一下,消息队列本身不会提供任何保证,你有很好的解决方案.您必须了解自己在做什么,并进行一些额外的测试.
您肯定知道有很多可能的布局:
我将使用布局B
作为说明1
生产者N
消费者问题的最简单方法.既然你很担心吞吐量.顺便说一句,正如您所料,RabbitMQ表现得非常好(来源).请注意prefetchCount
,我稍后会解决:
因此,消息处理逻辑可能是确保您拥有足够吞吐量的正确位置.当然,每次需要处理消息时,您都可以跨越一个新线程,但最终这种方法会终止您的系统.基本上,您可以获得更多线程,您可以获得更大的延迟(如果需要,您可以查看Amdahl定律).
(参见Amdahl的法律说明)
提示#1:小心线程,使用ThreadPools(详情)
线程池可以描述为Runnable对象(工作队列)的集合和正在运行的线程的连接.这些线程一直在运行,正在检查新工作的工作查询.如果要完成新工作,则执行此Runnable.Thread类本身提供了一个方法,例如execute(Runnable r)将新的Runnable对象添加到工作队列中.
public class Main {
private static final int NTHREDS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Runnable worker = new MyRunnable(10000000L + i);
executor.execute(worker);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
executor.awaitTermination();
System.out.println("Finished all threads");
}
}
Run Code Online (Sandbox Code Playgroud)
提示#2:注意消息处理开销
我会说这是明显的优化技术.您可能会发送小而易于处理的消息.整个方法是关于连续设置和处理的较小消息.大消息最终会发挥不好的笑话,所以最好避免这种情况.
因此最好发送微小的信息,但是处理呢?每次提交工作都会产生管理费用.在传入消息率高的情况下,批处理非常有用.
例如,假设我们有简单的消息处理逻辑,并且我们不希望每次处理消息时都有特定于线程的开销.为了优化那么简单CompositeRunnable can be introduced
:
class CompositeRunnable implements Runnable {
protected Queue<Runnable> queue = new LinkedList<>();
public void add(Runnable a) {
queue.add(a);
}
@Override
public void run() {
for(Runnable r: queue) {
r.run();
}
}
}
Run Code Online (Sandbox Code Playgroud)
或者通过收集要处理的消息以稍微不同的方式执行相同的操作:
class CompositeMessageWorker<T> implements Runnable {
protected Queue<T> queue = new LinkedList<>();
public void add(T message) {
queue.add(message);
}
@Override
public void run() {
for(T message: queue) {
// process a message
}
}
}
Run Code Online (Sandbox Code Playgroud)
通过这种方式,您可以更有效地处理消息.
提示#3:优化消息处理
尽管您知道可以并行处理消息(Tip #1
)并减少处理开销(Tip #2
),但您必须快速完成所有操作.冗余处理步骤,重循环等可能会对性能产生很大影响.请参阅有趣的案例研究:
通过选择正确的XML Parser,将Message Queue吞吐量提高十倍
提示#4:连接和频道管理
(来源)
请注意,所有提示都完美地协同工作.如果您需要其他详细信息,请随时告诉我们.
完整的消费者示例(来源)
请注意以下事项:
prefetchCount
可能非常有用:
此命令允许使用者选择预取窗口,该窗口指定准备接收的未确认消息的数量.通过将预取计数设置为非零值,代理将不会向违反该限制的消费者传递任何消息.要向前移动窗口,消费者必须确认收到消息(或一组消息).
例:
static class Worker extends DefaultConsumer {
String name;
Channel channel;
String queue;
int processed;
ExecutorService executorService;
public Worker(int prefetch, ExecutorService threadExecutor,
, Channel c, String q) throws Exception {
super(c);
channel = c;
queue = q;
channel.basicQos(prefetch);
channel.basicConsume(queue, false, this);
executorService = threadExecutor;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
Runnable task = new VariableLengthTask(this,
envelope.getDeliveryTag(),
channel);
executorService.submit(task);
}
}
Run Code Online (Sandbox Code Playgroud)
您还可以查看以下内容:
Dil*_*eep 20
您可以使用线程和通道实现.您所需要的只是一种对事物进行分类的方法,即登录中的所有队列项,来自security_events等的所有队列元素.可以使用routingKey来实现catagorization.
即:每次向队列添加项目时,请指定路由键.它将作为属性元素附加.通过这种方式,您可以获取特定事件的值,例如记录.
以下代码示例说明了如何在客户端完成它.
例如:
路由密钥用于标识信道的类型并检索类型.
例如,如果您需要获取有关Login类型的所有通道,则必须将路由键指定为login或其他一些关键字来标识它.
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
string routingKey="login";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
Run Code Online (Sandbox Code Playgroud)
您可以在此处查看有关分类的更多详细信息..
一旦发布部分结束,您可以运行线程部分..
在这部分中,您可以根据类别获取已发布的数据.即; 路由密钥,在您的情况下是日志记录,security_events和customer_orders等.
查看示例以了解如何检索线程中的数据.
例如:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//**The threads part is as follows**
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
// This part will biend the queue with the severity (login for eg:)
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
}
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
Run Code Online (Sandbox Code Playgroud)
现在创建一个处理类型login(路由键)的队列中的数据的线程.通过这种方式,您可以创建多个线程.每个服务目的不同.
看这里有关螺纹部分的详细信息..