Google PubSub 和来自 TOPIC 的重复消息

ses*_*ses 3 java google-cloud-platform google-cloud-pubsub

如何防止在 Google Cloud PubSub 中发生重复的 msg?

说,我有一个代码来处理它订阅的味精。

说,我有 2 个节点具有相同的服务,具有此代码。

一旦收到消息但尚未确认,另一个节点将收到相同的消息。这就是问题所在,我们有两个重复的 msgs

void messageReceiver(PubsubMessage pubsubMessage, AckReplyConsumer ackReply) {

        submitHandler.handle(toMessage(pubsubMessage))
                .doOnSuccess((response) -> {
                    log.info("Acknowledging the successfully processed message id: {}, response {}", pubsubMessage.getMessageId(), response);
                    ackReply.ack();  // <---- acknowledged
                })
                .doOnError((e) -> {
                    log.error("Not acknowledging due to an exception", e);
                    ackReply.nack();
                })
                .doOnTerminate(span::finish)
                .subscribe();
    }
Run Code Online (Sandbox Code Playgroud)

解决这个问题的方法是什么?这是正常行为吗?

Tec*_*ium 7

Google Cloud Pub/Sub 使用“至少一次”交付。从文档

通常,Cloud Pub/Sub 会按每条消息的发布顺序一次性交付每条消息。但是,有时可能会乱序或多次传递消息。一般来说,适应多次传递要求您的订阅者在处理消息时是幂等的

这意味着它保证它将传递消息 1:N 次,因此如果您不首先通过其他方式对其进行重复数据删除,您可能会多次获得该消息。没有您可以定义的设置来保证完全交付一次。文档确实参考了您可以使用 Cloud Dataflow's 获得所需的行为PubSubIO,但该解决方案似乎已被弃用

您可以使用 Cloud Dataflow 实现一次对 Cloud Pub/Sub 消息流的处理PubsubIO。PubsubIO 对自定义消息标识符或 Cloud Pub/Sub 分配的消息进行重复数据删除。

说了这么多,我实际上从未见过 Google Cloud Pub/Sub 两次发送消息。您确定这确实是您遇到的问题,还是因为您没有在确认截止日期内确认消息而重新发出消息(如上所述,默认为 10 秒)。如果你不承认它,它会被重新发行。从文档 (强调我的)

为单个主题创建订阅。它有几个可以在创建时设置或稍后更新的属性,包括:

  • 一个确认截止日期如果您的代码没有在截止日期前确认该消息,该消息被再次发送。默认值为 10 秒。您可以指定的最大自定义截止时间为 600 秒(10 分钟)。

如果是这种情况,只需在截止日期内确认您的消息,您就不会经常看到这些重复消息。

  • 现在是 2021 年。你见过重复的吗?让我恼火的是,他们没有一个系统,如果我确认了它,就不会再发送。GOOGLE做不到吗? (2认同)
  • @HenriqueBruno 我可以确认消息重复确实发生了。在过去 3 个月中,我们在大数据环境中发现了约 160 次事件,其中我们使用基于 PubSub 的云功能,通过 REST API(在 Cloud Composer 中)结合时间和基于文件的事件(.okay 文件的到达)触发 Airflow DAG到存储和云调度程序,配置为“*/5 * * * *”)。平均触发次数约为 2000 次/天。 (2认同)
  • 如果有人感兴趣的话,我们看到消息的重复率约为 0.00008(例如总共 104063246 条消息,其中 7992 条是重复的) (2认同)

小智 6

您可以使用 Memorystore 中的 Redis 来删除重复消息。您的发布者应在将消息发布到 PubSub 之前将跟踪 ID 添加到消息正文。另一方面,客户端(订阅者)应检查跟踪 ID 是否在缓存中 - 跳过该消息。如果没有此类消息 - 处理消息并将跟踪 ID 添加到缓存,过期时间为 7-8 天(PubSub 截止时间为 7 天)。通过这种简单的方式,您可以授予收到的正确消息。