Storm 中的延迟队列实现——Kafka、Cassandra、Redis 或 Beanstalk?

Ank*_*pta 5 java message-queue delayed-execution apache-kafka apache-storm

我有一个风暴拓扑来处理来自 Kafka 的消息,并根据手头的任务在 Cassandra 中进行 HTTP 调用/保存。我一收到消息就处理。由于响应来自外部源(如 HTTP),很少有消息没有被完全处理。我想为重试实现指数退避机制,以防 HTTP 服务器在一段时间后不响应/返回错误消息以重试。我可以想到一些可以实现它们的想法。如果我可以使用任何其他容错解决方案,我想知道其中哪一个是更好的解决方案。由于这用于实现指数退避,因此每条消息将具有不同的延迟时间。

  • Kafka中的另一个主题发送给它,稍后再使用。我首选的解决方案。我知道我们可以使用 Kafka 偏移量,以便在后期使用消息。我怎么找不到文档/示例代码来做同样的事情。如果有人可以帮助我解决这个问题,那将非常有帮助。
  • 编写消息Cassandra / Redis并编写调度程序以获取未处理且准备好使用的消息并将其发送到 Kafka,以便我的 Storm 拓扑可以使用它。(其他遗留项目中的现有解决方案(非风暴))
  • 延迟发送到Beanstalk(其他遗留项目中的现有解决方案(非风暴)。我想避免使用这个解决方案,只有在我无法选择的情况下才使用它)。

虽然这几乎是我想做的。我无法找到文档来实现Kafka - Delayed Queue implementation using high level consumer 中提到的delayProcessingUntil

我过去曾使用 Beanstalk 从 Data-store 完成预定工作并延迟,但我更喜欢使用 Kafka。

hob*_*lin 1

Kafka spout 内置指数退避消息重试。您可以通过 spout 配置来配置初始延迟、延迟乘数和最大延迟。如果bolt出现错误,可以调用collector.fail(input)。之后,您只需将其留给 spout 即可重试。

https://github.com/apache/storm/blob/v0.10.0/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java