ses*_*ses 3 java google-cloud-platform google-cloud-pubsub
如何防止在 Google Cloud PubSub 中发生重复的 msg?
说,我有一个代码来处理它订阅的味精。
说,我有 2 个节点具有相同的服务,具有此代码。
一旦收到消息但尚未确认,另一个节点将收到相同的消息。这就是问题所在,我们有两个重复的 msgs。
void messageReceiver(PubsubMessage pubsubMessage, AckReplyConsumer ackReply) {
submitHandler.handle(toMessage(pubsubMessage))
.doOnSuccess((response) -> {
log.info("Acknowledging the successfully processed message id: {}, response {}", pubsubMessage.getMessageId(), response);
ackReply.ack(); // <---- acknowledged
})
.doOnError((e) -> {
log.error("Not acknowledging due to an exception", e);
ackReply.nack();
})
.doOnTerminate(span::finish)
.subscribe();
}
Run Code Online (Sandbox Code Playgroud)
解决这个问题的方法是什么?这是正常行为吗?
Google Cloud Pub/Sub 使用“至少一次”交付。从文档:
通常,Cloud Pub/Sub 会按每条消息的发布顺序一次性交付每条消息。但是,有时可能会乱序或多次传递消息。一般来说,适应多次传递要求您的订阅者在处理消息时是幂等的。
这意味着它保证它将传递消息 1:N 次,因此如果您不首先通过其他方式对其进行重复数据删除,您可能会多次获得该消息。没有您可以定义的设置来保证完全交付一次。文档确实参考了您可以使用 Cloud Dataflow's 获得所需的行为PubSubIO,但该解决方案似乎已被弃用:
您可以使用 Cloud Dataflow 实现一次对 Cloud Pub/Sub 消息流的处理
PubsubIO。PubsubIO 对自定义消息标识符或 Cloud Pub/Sub 分配的消息进行重复数据删除。
说了这么多,我实际上从未见过 Google Cloud Pub/Sub 两次发送消息。您确定这确实是您遇到的问题,还是因为您没有在确认截止日期内确认消息而重新发出消息(如上所述,默认为 10 秒)。如果你不承认它,它会被重新发行。从文档 (强调我的):
为单个主题创建订阅。它有几个可以在创建时设置或稍后更新的属性,包括:
- 一个确认截止日期:如果您的代码没有在截止日期前确认该消息,该消息被再次发送。默认值为 10 秒。您可以指定的最大自定义截止时间为 600 秒(10 分钟)。
如果是这种情况,只需在截止日期内确认您的消息,您就不会经常看到这些重复消息。
小智 6
您可以使用 Memorystore 中的 Redis 来删除重复消息。您的发布者应在将消息发布到 PubSub 之前将跟踪 ID 添加到消息正文。另一方面,客户端(订阅者)应检查跟踪 ID 是否在缓存中 - 跳过该消息。如果没有此类消息 - 处理消息并将跟踪 ID 添加到缓存,过期时间为 7-8 天(PubSub 截止时间为 7 天)。通过这种简单的方式,您可以授予收到的正确消息。
| 归档时间: |
|
| 查看次数: |
6870 次 |
| 最近记录: |