the*_*one 4 amqp go rabbitmq kubernetes
我的第一个问题实际上是一个设计问题。这是我第一次编写使用队列的服务,而且我也是 Go 新手。我正在尝试确定是否应该以这样的方式编写我的工作程序,即它只需从队列中弹出一条消息,对其进行处理,然后就消失了。对于像 Kubernetes 这样的东西,这看起来相当微不足道。
或者我应该让一个长寿的工作人员不断等待新消息,但如果它死了(由于错误或意外),就会重新启动?
我问这个问题的原因是,为了实现前者,感觉有点“被破解”,因为我必须使用通用的 go AMQP 库编写以下内容streadway/amqp(阅读评论):
// Pop will extract a message from the AMQP queue
func (v *Queue) Pop() (data []byte, err error) {
msgs, err := v.Channel.Consume(
v.QueueName, // queue
v.ConsmerID, // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}
// We have to use for .. range because Consume returns
// "<-chan Delivery" but if we only want ONE message popped off
// we return on the first one
for data := range msgs {
return data.Body, nil
}
// We should never get this far...
return nil, errors.New("Something went wrong")
}
Run Code Online (Sandbox Code Playgroud)
<-chan Delivery此外,这种情况下是什么?它看起来像是某种可以插入的“流”或对象。有没有办法不必为这些数据类型编写 for 循环?
编辑:我还发现,该代码似乎会使整个队列出队,即使它只执行一次 for 循环迭代(如上面的代码所示)。我也不确定为什么会发生这种情况?
相关代码链接:
要简单地从 a 中获取单个对象<-chan Delivery,请不要使用range循环,而是使用通道运算符<-:
data := <- msgs
return data.Body, nil
Run Code Online (Sandbox Code Playgroud)
至于为什么你一获取一条消息整个队列就被清空了:这很可能是由于Consumer prefetch造成的。当消费消息时,客户端实际上不会从代理中逐一弹出它们,而是以可配置大小的批次(如果我没记错的话,默认情况下大约 32 或 64 条消息的顺序)弹出它们。一旦经纪人向您的消费者发布了这批消息,它们就会出现在您的msgs频道中;如果您在收到第一条消息后不再从该通道读取消息,则其余消息将消失(至少在auto-ack启用状态下 - 否则,它们将在通道关闭后重新排队)。
要一次仅获取一条消息,请使用通道的QoS函数(第一个参数是预取计数):
err := v.Channel.Qos(1, 0, false)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2239 次 |
| 最近记录: |