RabbitMQ:在关闭并重新打开的通道上确认/否认消息

Oxe*_*arf 2 rabbitmq node.js node-amqplib

我从 RabbitMq 服务器收到此错误

通道被服务器关闭:406(PRECONDITION-FAILED),消息为“PRECONDITION_FAILED - 未知的交货标签 80”

发生这种情况是因为在消费者任务期间连接丢失,最后,当消息被确认/确认时,我收到此错误,因为我无法在与我收到消息的通道不同的通道上确认消息。

这是 RabbitMq 连接的代码

async connect({ prefetch = 1, queueName }) {
    this.queueName = queueName;
    console.log(`[AMQP][${this.queueName}] | connecting`);
    return queue
        .connect(this.config.rabbitmq.connstring)
        .then(conn => {
            conn.once('error', err => {
                this.channel = null;
                if (err.message !== 'Connection closing') {
                    console.error(
                        `[AMQP][${this.queueName}] (evt:error) | ${err.message}`,
                    );
                }
            });

            conn.once('close', () => {
                this.channel = null;
                console.error(
                    `[AMQP][${this.queueName}] (evt:close) | reconnecting`,
                );
                this.connect({ prefetch, queueName: this.queueName });
            });
            return conn.createChannel();
        })
        .then(ch => {
            console.log(`[AMQP-channel][${this.queueName}] created`);
            ch.on('error', err => {
                console.error(
                    `[AMQP-ch][${this.queueName}] (evt:error) | ${err.message}`,
                );
            });
            ch.on('close', () => {
                console.error(`[AMQP-ch][${this.queueName}] (evt:close)`);
            });
            this.channel = ch;
            return this.channel;
        })
        .then(ch => {
            return this.channel.prefetch(prefetch);
        })
        .then(ch => {
            return this.channel.assertQueue(this.queueName);
        })
        .then(async ch => {
            while (this.buffer.length > 0) {
                const request = this.buffer.pop();
                await request();
            }
            return this.channel;
        })
        .catch(error => {
            console.error(error);
            console.log(`[AMQP][${this.queueName}] reconnecting in 1s`);
            return this._delay(1000).then(() =>
                this.connect({ prefetch, queueName: this.queueName }),
            );
        });
}

async ack(msg) {
    try {
        if (this.channel) {
            console.log(`[AMQP][${this.queueName}] ack`);
            await this.channel.ack(msg);
        } else {
            console.log(`[AMQP][${this.queueName}] ack (buffer)`);
            this.buffer.push(() => {
                this.ack(msg);
            });
        }
    } catch (e) {
        console.error(`[AMQ][${this.queueName}] ack error: ${e.message}`);
    }
}
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,在建立连接后,将创建一个通道,在出现连接问题后,该通道将设置为 NULL,并在 1 秒后重试连接,重新创建一个新通道。

为了管理离线期间,我使用一个缓冲区来收集在通道为 NULL 时发送的所有 ack 消息,并且在重新建立连接后我卸载缓冲区。

所以基本上我必须找到一种方法在连接丢失或通道因任何原因关闭后发送 ACK。

谢谢你的帮助

nox*_*fox 6

一旦通道关闭(无论原因是什么),您就无法确认消息。代理将自动将相同的消息重新传递给另一个消费者。

RabbitMQ消息确认部分对此有详细记录。

当消费者失败或失去连接时:自动重新排队

使用手动确认时,当发生传递的通道(或连接)关闭时,任何未确认的传递(消息)都会自动重新排队。这包括客户端的 TCP 连接丢失、消费者应用程序(进程)故障和通道级协议异常(如下所述)。

...

由于这种行为,消费者必须准备好处理重新交付,否则在实施时要考虑到幂等性。重新交付将有一个特殊的布尔属性 redeliver,由 RabbitMQ 设置为 true。对于第一次交付,它将被设置为 false。请注意,消费者可以接收先前传递给另一个消费者的消息。

正如文档所建议的,您需要通过实现消息幂等性设计模式来在消费者端处理此类问题。换句话说,您的架构应该准备好处理由于错误而导致的消息重新传递。

或者,您可以禁用消息确认并获取“一次传递”类型的模式。这意味着如果出现错误,您将不得不处理消息丢失的情况。

有关此事的进一步阅读: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

Kafka 引入新语义后的后续操作: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery-redux/