sim*_*ley 5 go google-compute-engine google-cloud-pubsub
我们正在从 AMQP 迁移到 Google 的 Pubsub。
文档表明pull 可能是我们的最佳选择,因为我们使用的是计算引擎并且无法打开我们的工作人员以通过推送服务接收。
它还说拉动可能会产生额外的费用,具体取决于使用情况:
如果使用轮询,如果您频繁打开连接并立即关闭它们,可能会导致高网络使用率。
我们在 go 中创建了一个测试订阅者,它在循环中运行,如下所示:
func main() {
jsonKey, err := ioutil.ReadFile("pubsub-key.json")
if err != nil {
log.Fatal(err)
}
conf, err := google.JWTConfigFromJSON(
jsonKey,
pubsub.ScopeCloudPlatform,
pubsub.ScopePubSub,
)
if err != nil {
log.Fatal(err)
}
ctx := cloud.NewContext("xxx", conf.Client(oauth2.NoContext))
msgIDs, err := pubsub.Publish(ctx, "topic1", &pubsub.Message{
Data: []byte("hello world"),
})
if err != nil {
log.Println(err)
}
log.Printf("Published a message with a message id: %s\n", msgIDs[0])
for {
msgs, err := pubsub.Pull(ctx, "subscription1", 1)
if err != nil {
log.Println(err)
}
if len(msgs) > 0 {
log.Printf("New message arrived: %v, len: %d\n", msgs[0].ID, len(msgs))
if err := pubsub.Ack(ctx, "subscription1", msgs[0].AckID); err != nil {
log.Fatal(err)
}
log.Println("Acknowledged message")
log.Printf("Message: %s", msgs[0].Data)
}
}
}
Run Code Online (Sandbox Code Playgroud)
不过,我的问题实际上是这是否是提取消息的正确/推荐方式。
我们全天每秒收到大约 100 条信息。我不确定在无限循环中运行它是否会让我们破产并且找不到任何其他像样的 go 示例。
一般来说,在 Cloud Pub/Sub 中拉取订阅者的关键是确保您始终至少有一些未完成的拉取请求,并将 max_messages 设置为适合以下情况的值:
一旦拉取请求返回,您应该发出另一个拉取请求。这意味着异步处理和确认拉取响应中返回给您的消息(或异步启动新的拉取请求)。如果您发现吞吐量或延迟不符合您的预期,首先要做的就是添加更多并发拉取请求。
如果您的发布率极低,那么“如果使用轮询,如果您频繁打开连接并立即关闭它们,可能会导致较高的网络使用率”这一说法适用。想象一下,您一天只发布两到三条消息,但您不断轮询拉取请求。这些拉取请求中的每一个都会产生发出请求的成本,但是除了实际收到消息的几次之外,您不会得到任何要处理的消息,因此“每条消息的成本”相当高。如果您以相当稳定的速率发布,并且您的拉取请求返回非零数量的消息,那么网络使用率和成本将与消息速率一致。
| 归档时间: |
|
| 查看次数: |
3816 次 |
| 最近记录: |