没有订阅者的RabbitMQ队列

Mr *_*rok 10 c# rabbitmq

"持久"和"持久模式"似乎与重新启动有关,而不是与没有订户接收消息有关.

我希望RabbitMQ在没有订阅者时将消息保留在队列中.当订户上线时,该订户应该收到该消息.RabbitMQ可以实现吗?

代码示例:

服务器:

namespace RabbitEg
{
    class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        static void Main(string[] args)
        {
            ConnectionFactory cnFactory = new RabbitMQ.Client.ConnectionFactory() { HostName = "localhost" };

            using (IConnection cn = cnFactory.CreateConnection())
            {
                using (IModel channel = cn.CreateModel())
                {
                    //channel.ExchangeDelete(EXCHANGE_NAME);
                    channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true);
                    //channel.BasicReturn += new BasicReturnEventHandler(channel_BasicReturn);

                    for (int i = 0; i < 100; i++)
                    {
                        byte[] payLoad = Encoding.ASCII.GetBytes("hello world _ " + i);
                        IBasicProperties channelProps = channel.CreateBasicProperties();
                        channelProps.SetPersistent(true);

                        channel.BasicPublish(EXCHANGE_NAME, "routekey_helloworld", false, false, channelProps, payLoad);

                        Console.WriteLine("Sent Message " + i);
                        System.Threading.Thread.Sleep(25);
                    }

                    Console.ReadLine();
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

客户:

namespace RabbitListener
{
    class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        static void Main(string[] args)
        {
            ConnectionFactory cnFactory = new ConnectionFactory() { HostName = "localhost" };

            using (IConnection cn = cnFactory.CreateConnection())
            {
                using (IModel channel = cn.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true);

                    string queueName = channel.QueueDeclare("myQueue", true, false, false, null);
                    channel.QueueBind(queueName, EXCHANGE_NAME, "routekey_helloworld");

                    Console.WriteLine("Waiting for messages");

                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        Console.WriteLine(Encoding.ASCII.GetString(e.Body));
                    }
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

scv*_*lex 14

有关含义和含义的解释,请参阅AMQP参考.durablepersistent

基本上,队列durable或者non-durable.前者生存经纪人重启,后者则没有.

消息作为transient或发布persistent.我们的想法是队列上的persistent消息durable也应该在代理重启时继续存在.

因此,为了得到你想要的东西,你需要1)将队列声明为durable和2)将消息发布为persistent.此外,您可能还希望在频道上启用发布商确认 ; 这样,您就会知道经纪人何时承担了该消息的责任.