从队列中“弹出”一条消息的正确 Go/RabbitMQ 方法?

the*_*one 4 amqp go rabbitmq kubernetes

我的第一个问题实际上是一个设计问题。这是我第一次编写使用队列的服务,而且我也是 Go 新手。我正在尝试确定是否应该以这样的方式编写我的工作程序,即它只需从队列中弹出一条消息,对其进行处理,然后就消失了。对于像 Kubernetes 这样的东西,这看起来相当微不足道。

或者我应该让一个长寿的工作人员不断等待新消息,但如果它死了(由于错误或意外),就会重新启动?

我问这个问题的原因是,为了实现前者,感觉有点“被破解”,因为我必须使用通用的 go AMQP 库编写以下内容streadway/amqp(阅读评论):

// Pop will extract a message from the AMQP queue
func (v *Queue) Pop() (data []byte, err error) {
    msgs, err := v.Channel.Consume(
        v.QueueName, // queue
        v.ConsmerID, // consumer
        true,        // auto-ack
        false,       // exclusive
        false,       // no-local
        false,       // no-wait
        nil,         // args
    )
    if err != nil {
        return nil, err
    }

    // We have to use for .. range because Consume returns
    // "<-chan Delivery" but if we only want ONE message popped off
    // we return on the first one
    for data := range msgs {
        return data.Body, nil
    }

    // We should never get this far...
    return nil, errors.New("Something went wrong")
}
Run Code Online (Sandbox Code Playgroud)

<-chan Delivery此外,这种情况下是什么?它看起来像是某种可以插入的“流”或对象。有没有办法不必为这些数据类型编写 for 循环?

编辑:我还发现,该代码似乎会使整个队列出队,即使它只执行一次 for 循环迭代(如上面的代码所示)。我也不确定为什么会发生这种情况?

相关代码链接:

hel*_*ert 5

要简单地从 a 中获取单个对象<-chan Delivery,请不要使用range循环,而是使用通道运算符<-

data := <- msgs
return data.Body, nil
Run Code Online (Sandbox Code Playgroud)

至于为什么你一获取一条消息整个队列就被清空了:这很可能是由于Consumer prefetch造成的。当消费消息时,客户端实际上不会从代理中逐一弹出它们,而是以可配置大小的批次(如果我没记错的话,默认情况下大约 32 或 64 条消息的顺序)弹出它们。一旦经纪人向您的消费者发布了这批消息,它们就会出现在您的msgs频道中;如果您在收到第一条消息后不再从该通道读取消息,则其余消息将消失(至少在auto-ack启用状态下 - 否则,它们将在通道关闭后重新排队)。

要一次仅获取一条消息,请使用通道的QoS函数(第一个参数是预取计数):

err := v.Channel.Qos(1, 0, false)
Run Code Online (Sandbox Code Playgroud)