如何检测死RabbitMQ连接

Ale*_*der 9 amqp go rabbitmq

我在Go中有一个RabbitMQ使用者脚本(这是一个使用streadway/amqp库的RabbitMQ教程的简单脚本)

问题在于,如果rabbitmq-server停止,则消费者脚本不会退出,并且当再次启动rabbitmq-server时,消费者不再接收消息.

有没有办法检测消费者连接是否已死并重新连接或至少终止消费者脚本?

我知道它设定默认值为10秒.连接的心跳间隔,是否可以使用它?

谢谢你的帮助

    func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
            "test_task_queue", // name
            true,         // durable
            false,        // delete when unused
            false,        // exclusive
            false,        // no-wait
            nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
        )
        failOnError(err, "Failed to set QoS")

        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                d.Ack(false)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                log.Printf("Done")
            }
        }()

        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
    }
Run Code Online (Sandbox Code Playgroud)

Uve*_*tel 13

amqp.Connection具有NotifyClose()返回信道传输传输或协议错误的方法.所以像

for {  //reconnection loop
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") //setup
    notify := conn.NotifyClose(make(chan *amqp.Error)) //error channel
...
    ch, err := conn.Channel()
    msgs, err := ch.Consume(
...
    for{  //receive loop
        select {  //check connection
            case err = <-notify:
            //work with error
            break //reconnect
        case d = <- msgs:
            //work with message
        ...
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 应该注意的是,如果有人在连接上调用Close,NotifyClose将返回nil.这样可以区分错误(可能需要重新连接)和正常的应用程序终止. (2认同)

bla*_*een 8

有几种方法可以做到这一点:检查传递通道是否关闭或使用Channel.NotifyClose.

\n

检查发货渠道

\n

启动消费者后,您将从交付渠道收到。如您所知,接收操作可能采用特殊形式x, ok := <-ch,其中当由于通道关闭(且为空)而值为零时ok为 false :x

\n
conn, _ := amqp.Dial(url)\nch, _ := conn.Channel()\n\ndelivery, _ := ch.Consume(\n        queueName,\n        consumerName,\n        true,  // auto ack\n        false, // exclusive\n        false, // no local\n        true,  // no wait,\n        nil,   // table\n    )\n\nfor {\n    payload, ok := <- delivery\n    if !ok {\n        // ... channel closed\n        return\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

<-chan amqp.Delivery这是有效的,因为当 AMQP 通道关闭或发生错误时,Go 通道将被关闭:

\n
\n

[It] 继续向返回的 chan Delivery 进行交付,直到发生 Channel.Cancel、Connection.Close、Channel.Close 或 AMQP 异常。

\n
\n

使用Channel.NotifyClose

\n

这很简单。而且原理是一样的:

\n
\n

NotifyClose 以 Connection.Close 或 Channel.Close 方法的形式注册服务器何时发送通道或连接异常的侦听器。

\n
\n

返回的通道与NotifyClose您作为参数传递的通道相同;该方法仅在内部注册它,所以你可以这样做:

\n
errC := ch.NotifyClose(make(chan *amqp.Error, n))\n
Run Code Online (Sandbox Code Playgroud)\n

其中n是非零缓冲区大小。确保传递缓冲通道,否则NotifyClose,根据代码的结构,库可能会在发送时阻塞。

\n

然后,您可以在通道上接收errC并根据收到的错误类型采取措施。简而言之,错误可能是:

\n
    \n
  • 连接错误,通常无法恢复
  • \n
  • 通道错误,也称为软异常,通常可以通过重置连接来恢复
  • \n
  • nil如果程序conn.Close()故意调用
  • \n
\n

要了解错误是否可恢复,您可以检查amqp.Error\'sCode字段和/或该Recover字段,在发生软异常时该字段设置为 true。

\n

以下函数显示了如何区分错误代码\xe2\x80\x94,这是作为附加见解提供的。对于一般情况,只需检查Error.Recover

\n
conn, _ := amqp.Dial(url)\nch, _ := conn.Channel()\n\ndelivery, _ := ch.Consume(\n        queueName,\n        consumerName,\n        true,  // auto ack\n        false, // exclusive\n        false, // no local\n        true,  // no wait,\n        nil,   // table\n    )\n\nfor {\n    payload, ok := <- delivery\n    if !ok {\n        // ... channel closed\n        return\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n