RabbitMQ C# 验证消息已发送

And*_*tia 5 c# rabbitmq

我是 RabbitMQ 的新手,并尝试写入队列并验证消息是否已发送。如果它失败了,我需要知道它。我制作了一个假队列来观看它失败,但无论我看到什么都没有执行,当我在寻找 ack 时,我总是得到一个。我从没见过 BasicNack。

我什至不确定我是 BasicAcks 是要走的路。

    private void button1_Click(object sender, EventArgs e)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("task_queue", true, false, false, null);

                var message = ("Helllo world");
                var body = Encoding.UTF8.GetBytes(message);
                channel.ConfirmSelect();

                var properties = channel.CreateBasicProperties();
                properties.SetPersistent(true);
                properties.DeliveryMode = 2;
                channel.BasicAcks += channel_BasicAcks;
                channel.BasicNacks += channel_BasicNacks;
                //fake queue should be task_queue
                channel.BasicPublish("", "task_2queue", true, properties, body);

                channel.WaitForConfirmsOrDie();

                Console.WriteLine(" [x] Sent {0}", message);
            }
        }
    }

    void channel_BasicNacks(IModel model, BasicNackEventArgs args)
    {

    }

    void channel_BasicAcks(IModel model, BasicAckEventArgs args)
    {

    }
Run Code Online (Sandbox Code Playgroud)

Tod*_*nce 5

对于那些寻找 C# 答案的人 - 这就是您所需要的。

https://rianjs.net/2013/12/publisher-confirms-with-rabbitmq-and-c-sharp

像这样:(BasicAcks 附加了一个事件处理程序 - 还有 BasicNacks)

using (var connection = FACTORY.CreateConnection())
{
    var channel = connection.CreateModel();
    channel.ExchangeDeclare(QUEUE_NAME, ExchangeType.Fanout, true);
    channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
    channel.QueueBind(QUEUE_NAME, QUEUE_NAME, String.Empty, new Dictionary<string, object>());
     channel.BasicAcks += (sender, eventArgs) =>
                {
                    //implement ack handle
                };
    channel.ConfirmSelect();

    for (var i = 1; i <= numberOfMessages; i++)
    {
        var messageProperties = channel.CreateBasicProperties();
        messageProperties.SetPersistent(true);

        var message = String.Format("{0}\thello world", i);
        var payload = Encoding.Unicode.GetBytes(message);
        Console.WriteLine("Sending message: " + message);
        channel.BasicPublish(QUEUE_NAME, QUEUE_NAME, messageProperties, payload);
        channel.WaitForConfirmsOrDie();
    }
}
Run Code Online (Sandbox Code Playgroud)


Gab*_*ele 2

您需要发布者确认

正如您所读到的,您可以实施:

交易:

ch.txSelect(); <-- start transaction
ch.basicPublish("", QUEUE_NAME,
                            MessageProperties.PERSISTENT_BASIC,
                            "nop".getBytes());
ch.txCommit();<--commit transaction
Run Code Online (Sandbox Code Playgroud)

消息被存储到队列和磁盘中。这种方式可能会很慢,如果您需要性能,则不应使用它。

您可以使用流式轻量级发布者确认,方法是:

ch.setConfirmListener(new ConfirmListener() {
    public void handleAck(long seqNo, boolean multiple) {
        if (multiple) {
            unconfirmedSet.headSet(seqNo+1).clear();
        } else {
            unconfirmedSet.remove(seqNo);
        }
    }
    public void handleNack(long seqNo, boolean multiple) {
        // handle the lost messages somehow
    }
Run Code Online (Sandbox Code Playgroud)

我希望它有帮助

  • 您提供的流轻量级示例不是 c# 中的,该操作正在使用的操作,您可以考虑编辑此答案并添加 ac# 实现吗? (3认同)
  • WaitForConfirmsOrDie 比 txcommit 更快,无论如何我建议使用 WaitForConfirmsOrDie(TIME_OUT),否则如果出现问题,您可能会面临永远阻止发布的风险。如果您需要表演,确认发布通常不是一个好的选择。所以你可以尝试在多线程中打开更多的通道并使用WaitForConfirmsOrDie(TIME_OUT)。我认为你可以做出正确的妥协。让我知道 (2认同)