lin*_*jus 6 google-cloud-pubsub
根据 stackdriver 图表,我们开始注意到某个主题/订阅的“未确认消息”数量不时增加。
我不知道我们对 stackdriver 图表的信任程度如何,但我已经检查过了:
另外,根据我们的日志,我可以看到 pubsub 实际上多次发送相同的消息,这也证实了“pull”成功,但“ack”可能不成功。
因此,我认为我们可以假设我们的系统会及时拉取,但从 GCP 的角度来看,ACK 效果不佳。
我检查了不按时发送 ACK 的可能性,但我认为情况并非如此,如下所示。
在有问题的订阅中,消息会累积几个小时。对于我们来说,这是一个严重的问题。
我们出于某种原因使用拉式方法,并且不愿意切换到推式方法,除非有充分的理由。对于每个订阅,我们都有一个消息泵送 Goroutine,并且该 Goroutine 会为每条拉取的消息生成一个工作线程。更具体,
// in a dedicated message-pumping goroutine
sub, _ := CreateSubscription(..., 0 /* ack-deadline */, )
iter, _ := sub.Pull(...)
for {
// omitted: wait if we have too many workers
msg, _ := iter.Next()
go func(msg Message) {
// omitted: handle the message and measure the latency; it turned out the latency is almost within 1 second
msg.Done(true)
}(msg)
}
Run Code Online (Sandbox Code Playgroud)
为了实现负载平衡,同一集群中的其他 Pod 也会拉取订阅。因此,对于一个订阅(如 Google Pubsub 主题/订阅),我们有多个订阅对象(如 Go 绑定的订阅结构),每个对象仅在一个 Pod 中使用。并且,每个订阅对象都会创建一个迭代器。我相信这个设置没有错,但如果我错了,请纠正我。
如这段代码所示,我们执行 ACK。(我们的服务器不会恐慌;所以没有办法绕过 msg.Done()。)
奇怪的是,有问题的订阅并不是繁忙的订阅。对于在同一 Pod 中接收更多消息的另一个订阅,我们通常不会有任何问题。所以,我开始怀疑拉取操作的 max-prefetch 选项是否会产生影响。看似解决了一段时间的问题,但问题又出现了。
根据 Google 支持的建议,我还增加了 Pod 的数量,这实际上增加了工作人员的数量。这并没有多大帮助。由于我们不会向有问题的问题发布很多消息(大约 1 条消息/秒)并且我们有大量(可能太多)工作人员,因此我不认为我们的服务器过载。
有人可以解释一下吗?
就我而言,由于某种原因 Ack 未返回的症状经常出现,未设置调用 gRPC 的超时,并且“acker”的例程被阻塞。
所以我通过从 pubsub.NewClient 传递 gRPC 选项来解决这个问题。
import (
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
// ...
scChan := make(chan grpc.ServiceConfig)
go func() {
sc := grpc.ServiceConfig{
Methods: map[string]grpc.MethodConfig{
"/google.pubsub.v1.Subscriber/Acknowledge": {
Timeout: 5 * time.Second,
},
},
}
scChan <- sc
}()
c, err := pubsub.NewClient(ctx, project, option.WithGRPCDialOption(grpc.WithServiceConfig(scChan)))
Run Code Online (Sandbox Code Playgroud)
您可以通过指定 来调查原因grpc.EnableTracing = true。
grpc.EnableTracing = true
c, err := pubsub.NewClient(ctx, project)
if err != nil {
return nil, errors.Wrap(err, "pubsub.NewClient")
}
go func(){
http.ListenAndServe(":8080", nil)
}()
Run Code Online (Sandbox Code Playgroud)
gRPC的trace信息可以通过 确认golang.org/x/net/trace。
| 归档时间: |
|
| 查看次数: |
1754 次 |
| 最近记录: |