是否有可能通过RabbitMQ发送消息有一些延迟?例如,我希望在30分钟后使客户端会话到期,并且我发送一条消息,该消息将在30分钟后处理.
lam*_*dar 16
您可以尝试两种方法:
旧方法:在每个消息/队列(策略)中设置TTL(生存时间)标头,然后引入DLQ来处理它.一旦ttl过期,您的消息将从DLQ移动到主队列,以便您的侦听器可以处理它.
最新方法:最近,RabbitMQ提出了RabbitMQ延迟消息插件,使用它可以实现相同的功能,并且自RabbitMQ-3.5.8起支持此插件支持.
您可以使用x-delayed-message类型声明交换,然后使用自定义标头x-delay发布消息,以毫秒为单位表示消息的延迟时间.消息将在x延迟毫秒后传递到相应的队列
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new
AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
Run Code Online (Sandbox Code Playgroud)
更多这里:git
Jon*_*ver 13
随着RabbitMQ v2.8的发布,预定的交付现在可用,但作为间接功能:http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
感谢Norman的回答,我可以在NodeJS中实现它.
从代码中可以清楚地看到一切.希望它能节省一些人的时间.
var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});
// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
deadLetterExchange: "my_final_delayed_exchange",
messageTtl: 5000, // 5sec
}, function (err, q) {
ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});
ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
ch.bindQueue(q.queue, "my_final_delayed_exchange", '');
ch.consume(q.queue, function (msg) {
console.log("delayed - [x] %s", msg.content.toString());
}, {noAck: true});
});
Run Code Online (Sandbox Code Playgroud)
小智 7
因为我没有足够的声誉来添加评论,发布一个新的答案.这只是对http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html上已经讨论过的内容的补充.
除了不在消息上设置ttl,您可以在队列级别设置它.此外,您可以避免仅为了将消息重定向到不同的队列而创建新的交换.这是示例java代码:
制片人:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class DelayedProducer {
private final static String QUEUE_NAME = "ParkingQueue";
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
arguments.put("x-dead-letter-exchange", "");
arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME );
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for (int i=0; i<5; i++) {
String message = "This is a sample message " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("message "+i+" got published to the queue!");
Thread.sleep(3000);
}
channel.close();
connection.close();
}
}
Run Code Online (Sandbox Code Playgroud)
消费者:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
Run Code Online (Sandbox Code Playgroud)
看起来这个博客文章描述了使用死信交换和消息ttl做类似的事情.
下面的代码使用CoffeeScript和Node.JS来访问Rabbit并实现类似的东西.
amqp = require 'amqp'
events = require 'events'
em = new events.EventEmitter()
conn = amqp.createConnection()
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
conn.queue key, {
arguments:{
"x-dead-letter-exchange":"immediate"
, "x-message-ttl": 5000
, "x-expires": 6000
}
}, ->
conn.publish key, {v:1}, {contentType:'application/json'}
conn.exchange 'immediate'
conn.queue 'right.now.queue', {
autoDelete: false
, durable: true
}, (q) ->
q.bind('immediate', 'right.now.queue')
q.subscribe (msg, headers, deliveryInfo) ->
console.log msg
console.log headers
Run Code Online (Sandbox Code Playgroud)
这是目前不可能的.您必须将过期时间戳存储在数据库或类似的东西中,然后有一个帮助程序读取这些时间戳并对消息进行排队.
延迟消息是一种经常被请求的功能,因为它们在许多情况下都很有用.但是,如果您需要使客户端会话到期,我相信消息传递不是您理想的解决方案,而另一种方法可能会更好.
| 归档时间: |
|
| 查看次数: |
28343 次 |
| 最近记录: |