从多个队列中读取,RabbitMQ

Dem*_*emi 7 .net c# amqp rabbitmq

我是RabbitMQ的新手.我希望能够在有多个队列(要读取)的情况下处理读取消息而不会阻塞.有关如何做到这一点的任何意见?

//编辑1

public class Rabbit : IMessageBus
{   

    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    Subscription sub = null;

    public void writeMessage( Measurement m1 ) {
        byte[] body = Measurement.AltSerialize( m1 );
        int msgCount = 1;
        Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);

        string finalQueue = publishToQueue( m1.id );

        while (msgCount --> 0) {
            channel.BasicPublish("amq.direct", finalQueue, null, body);
        }

        Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id);
    }

     public string publishToQueue(string firstQueueName) {
        Console.WriteLine("Creating a queue and binding it to amq.direct");
        string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null);
        channel.QueueBind(queueName, "amq.direct", queueName, null);
        Console.WriteLine("Done.  Created queue {0} and bound it to amq.direct.\n", queueName);
        return queueName;
    }


    public Measurement readMessage() {
        Console.WriteLine("Receiving message...");
        Measurement m = new Measurement();

        int i = 0;
        foreach (BasicDeliverEventArgs ev in sub) {
            m = Measurement.AltDeSerialize(ev.Body);
            //m.id = //get the id here, from sub
            if (++i == 1)
                break;
            sub.Ack();
        }

        Console.WriteLine("Done.\n");
        return m;
    }


    public void subscribeToQueue(string queueName ) 
    {
        sub = new Subscription(channel, queueName);
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public Rabbit(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        //consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    ~Rabbit()
    {
        //observer??
        connection.Dispose();
        //channel.Dispose();
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}
Run Code Online (Sandbox Code Playgroud)

//编辑2

private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            foreach (BasicDeliverEventArgs ev in element) {
                //ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    return m;
                }
                m =  null;  
            }           
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub);     
    }
Run Code Online (Sandbox Code Playgroud)

//编辑3

//MessageHandler.cs

public class MessageHandler
{   
    // Implementation of methods for Rabbit class go here
    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    QueueingBasicConsumer consumer = null;  

    private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

    public void writeMessage ( Measurement m1 )
    {
        byte[] body = Measurement.AltSerialize( m1 );
        //declare a queue if it doesn't exist
        publishToQueue(m1.id);

        channel.BasicPublish("amq.direct", m1.id, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", m1.id);
    }

    public void publishToQueue(string queueName)
    {   
        string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null);
        channel.QueueBind(finalQueueName, "amq.direct", "", null);
    }

    public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            if( element.QueueName == null)
            {
                m = null;
            }
            else 
            {
                BasicDeliverEventArgs ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    m.id = element.QueueName;
                    element.Ack();
                    return m;
                }
                m =  null;                      
            }
            element.Ack();
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub); 
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public MessageHandler(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    public void disposeAll()
    {
        connection.Dispose();
        channel.Dispose();
        foreach(Subscription element in subscriptions)
        {
            element.Close();
        }
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}
Run Code Online (Sandbox Code Playgroud)

//App1.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;


public class MainClass
{
    public static void Main()
    {

    MessageHandler obj1 = MessageHandler("Rabbit");

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create new Measurement messages
    Measurement m1 = new Measurement("q1", 2345, 23.456); 
    Measurement m2 = new Measurement("q2", 222, 33.33);

    System.Console.WriteLine("Test message 1:\n    ID: {0}", m1.id);
    System.Console.WriteLine("    Time: {0}", m1.time);
    System.Console.WriteLine("    Value: {0}", m1.value);

    System.Console.WriteLine("Test message 2:\n    ID: {0}", m2.id);
    System.Console.WriteLine("    Time: {0}", m2.time);
    System.Console.WriteLine("    Value: {0}", m2.value);   

    // Ask queue name and store it
    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName );

    // Write message to the queue
    obj1.writeMessage( m1 );    

    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName2 = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName2 );

    obj1.writeMessage( m2 );

    obj1.disposeAll();
}
}
Run Code Online (Sandbox Code Playgroud)

//App2.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create a new Measurement object m
    Measurement m = new Measurement();  

    System.Console.WriteLine("Queue name to subscribe to: ");
    string QueueName1 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName1 );

    //Read message into m
    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    System.Console.WriteLine("Another queue name to subscribe to: ");
    string QueueName2 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName2 );

    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    obj1.disposeAll();
}
}
Run Code Online (Sandbox Code Playgroud)

sgt*_*gtz 13

两个信息来源:

  1. http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

  2. 你应该首先尝试理解这些例子.

    • %Program Files%\ RabbitMQ\DotNetClient\examples\src(基本示例)

    • 从他们的Mercurial存储库(c#项目)中获取完整的工作示例.

有用的操作来理解:

  • 声明/断言/收听/订阅/发布

回复:你的问题 - 没有理由你不能拥有多个听众.或者您可以在"交换"上使用一个侦听器订阅n个路由路径.

**re:非阻塞**

典型的监听器一次消耗一条消息.您可以将它们从队列中拉出来,或者它们将以"窗口化"方式自动放置在靠近消费者的位置(通过服务质量qos参数定义).这种方法的优点在于为您完成了许多艰苦的工作(重新:可靠性,保证交付等).

RabbitMQ的一个关键特性是,如果处理中出现错误,则将消息重新添加回队列(容错功能).

需要了解更多关于你的情况.

通常,如果您发布到我上面提到的列表中,您可以在RabbitMQ上找到工作人员.他们非常有帮助.

希望有所帮助.一开始你的头脑很多,但值得坚持下去.


Q&A

见:http://www.rabbitmq.com/faq.html

问:您可以使用新订阅(频道,queueName)订阅多个队列吗?

是.您可以使用绑定密钥,例如abc.*.hij,或abc.#.hij,或者您附加多个绑定.前者假设您已经围绕某种对您有意义的原则设计了路由密钥(请参阅常见问题解答中的路由密钥).对于后者,您需要绑定到多个队列.

手动实现n绑定.请参阅:http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

这种模式背后的代码不多,所以如果通配符不够,你可以推出自己的订阅模式.你可以从这个类继承并添加另一个方法来进行额外的绑定...可能这会有效或者接近这个(未经测试).

AQMP规范说可以进行多种手动绑定:http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

问:如果是这样,我如何通过所有订阅的队列并返回一条消息(没有消息时为null)?

对于订户,您会在有消息时收到通知.否则,您所描述的是一个拉动界面,您可以根据请求提取消息.如果没有可用的消息,您将根据需要获得null.顺便说一句:Notify方法可能更方便.

问:哦,请注意,我用不同的方法完成了所有这些操作.我将编辑我的帖子以反映代码

实时代码:

此版本必须使用通配符来订阅多个路由密钥

使用订阅的手动路由密钥留给读者练习.;-)我认为你无论如何都倾向于拉接口.顺便说一句:拉接口效率低于通知接口.

        using (Subscription sub = new Subscription(ch, QueueNme))
        {
            foreach (BasicDeliverEventArgs ev in sub)
            {
                Process(ev.Body);

        ...
Run Code Online (Sandbox Code Playgroud)

注意:foreach使用IEnumerable,IEnumerable通过"yield"语句包装新消息到达的事件.实际上它是一个无限循环.

---更新

AMQP的设计理念是将TCP连接的数量保持在与应用程序数量一样低的程度,这意味着每个连接可以有多个通道.

这个问题中的代码(编辑3)尝试使用一个通道的两个订阅者,而它应该(我相信),每个通道每个通道一个订阅者,以避免锁定问题.消化:使用路由密钥"通配符".可以使用java客户端订阅多个不同的队列名称,但据我所知,.net客户端在订阅者帮助程序类中实现了这一点.

如果您确实在同一订阅线程上需要两个不同的队列名称,则建议为.net建议以下拉序列:

        using (IModel ch = conn.CreateModel()) {    // btw: no reason to close the channel afterwards IMO
            conn.AutoClose = true;                  // no reason to closs the connection either.  Here for completeness.

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:");
            }

            return 0;
        }
Run Code Online (Sandbox Code Playgroud)

- 更新2:

来自RabbitMQ列表:

"假设element.Next()在其中一个订阅上被阻止.您可以从每个订阅中检索交付,并通过超时读取它.或者您可以设置一个队列来接收所有度量并从中检索消息单一订阅." (埃米尔)

这意味着当第一个队列为空时,.Next()会阻塞等待下一条消息出现.即订户内置了等待下一条消息.

- 更新3:

在.net下,使用QueueingBasicConsumer从多个队列中使用.

实际上,这是一个关于它的线索,以了解使用情况:

等待单个RabbitMQ消息超时

- 更新4:

关于.QueueingBasicConsumer的更多信息

这里有示例代码.

http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html

示例通过一些修改复制到答案中(参见// <-----).

                IModel channel = ...;
            QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, false, null, consumer);  //<-----
            channel.BasicConsume(queueName2, false, null, consumer); //<-----
            // etc. channel.BasicConsume(queueNameN, false, null, consumer);  //<-----

            // At this point, messages will be being asynchronously delivered,
            // and will be queueing up in consumer.Queue.

            while (true) {
                try {
                    BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                    // ... handle the delivery ...
                    channel.BasicAck(e.DeliveryTag, false);
                } catch (EndOfStreamException ex) {
                    // The consumer was cancelled, the model closed, or the
                    // connection went away.
                    break;
                }
            }
Run Code Online (Sandbox Code Playgroud)

- 更新5:一个简单的get将作用于任何队列(一个更慢,但有时更方便的方法).

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:"); 
                // deserialize body and display extra info here.
            }
Run Code Online (Sandbox Code Playgroud)