Cor*_*ped 2 cloud publish-subscribe google-cloud-platform google-cloud-pubsub
在我的业务应用程序中,我必须定期批处理来自某个主题的所有消息,因为它比以先到先服务的方式处理它们要便宜。我目前计划的方法是让cronjob订阅者每小时运行一次T。我当前正在解决的问题是如何在处理完所有消息后终止订阅者。cronjob我想每小时启动一次T,让订阅者消耗主题队列中的所有消息并终止。据我了解,没有pub-subJava API 告诉我主题队列是否为空。我想出了以下2个解决方案:
创建一个异步拉取的订阅者。t minutes在它消耗所有消息时休眠,然后使用 终止它subscriber.stopAsync().awaitTerminated();。在这种方法中,我有可能在终止订阅者之前不会消耗所有消息。谷歌的例子在这里
用于Pub/Sub Cloud monitoring查找指标的值 subscription/num_undelivered_messages。然后使用Google 提供的同步拉取示例拉取那么多消息。然后终止订阅者。
有一个更好的方法吗?
谢谢!
可能值得考虑 Cloud Pub/Sub 是否是适合这种情况的正确技术。如果您想进行批处理,最好将数据存储在 Google Cloud Storage 或数据库中。Cloud Pub/Sub 确实最适合连续拉取/处理消息。
您提出的两个建议是尝试确定何时不再有消息需要处理。确实没有一种干净的方法可以做到这一点。您的第一个建议是可能的,但请记住,虽然大多数消息都会非常快地传递,但可能会有异常值需要更长的时间才能发送给您的订阅者。如果处理所有未完成的消息至关重要,那么此方法可能不起作用。但是,如果下次启动订阅者时偶尔处理消息是可以的,那么您可以使用此方法。最好按照 guillaum blaquiere 的建议,自上次收到消息以来设置一个计时器,尽管我会使用 1 分钟左右的超时而不是 100 毫秒。
您的第二个建议是监视未传递消息的数量,然后发送拉取请求来检索这么多消息,这不是一种可行的方法。首先,max_messages拉取请求的属性并不保证max_messages返回所有可用的消息。有可能在拉取响应中取回零消息,但仍然有未传递的消息。因此,您必须保留收到的消息数并尝试匹配num_undelivered_messages指标。您必须考虑到这种情况下的重复交付以及 Stackdriver 监控指标可能落后于实际值的事实。如果该值太大,您可能会尝试获取您不会收到的消息。如果该值太小,您可能无法收到所有消息。
在这两种方法中,跟踪自收到最后一条消息以来多长时间的方法是更好的方法,但有提到的警告。
| 归档时间: |
|
| 查看次数: |
3204 次 |
| 最近记录: |