Storm Kafka Spout上最多的元组重放次数

kle*_*sch 8 apache-kafka apache-storm

我们正在使用Storm与Kafka Spout.当我们失败消息时,我们想重播它们,但在某些情况下,错误的数据或代码错误会导致消息总是失败一个Bolt,所以我们将进入一个无限的重放周期.显然我们在找到错误时会修复错误,但希望我们的拓扑通常具有容错能力.在重放N次以上之后我们怎么能得到一个元组?

通过Kafka Spout的代码,我看到它被设计为使用指数退避计时器和PR状态的注释重试:

"喷口不会终止重试周期(我确信它不应该这样做,因为它不能报告关于中止请求的故障的上下文),它只处理延迟重试.拓扑中的螺栓是仍然期望最终调用ack()而不是fail()来停止循环."

我已经看到StackOverflow响应建议编写自定义喷口,但如果有一种推荐的方法在Bolt中执行此操作,我宁可不要卡住维护Kafka Spout内部的自定义补丁.

什么是在博尔特这样做的正确方法?我没有看到元组中的任何状态暴露了重播的次数.

Mat*_*Sax 5

Storm本身不会为您的问题提供任何支持.因此,定制解决方案是唯一的出路.即使你不想打补丁KafkaSpout,我认为,引入计数器并打破其中的重播周期,将是最好的方法.作为替代方案,您也可以继承KafkaSpout并在您的子类中放置一个计数器.这当然有点类似于补丁,但可能不那么具有侵入性并且更容易实现.

如果你想使用Bolt,你可以执行以下操作(这也需要KafkaSpout对它或它的子类进行一些更改).

  • 为每个元组分配一个唯一的ID作为附加属性(可能已经有一个唯一的ID;否则,您可以引入一个"counter-ID"或整个元组,即所有属性,以识别每个元组).
  • 插入后的螺栓KafkaSpout经由fieldsGrouping上的ID(以确保该被重放的元组被流传输到相同的螺栓实例).
  • 在你的螺栓中,使用一个HashMap<ID,Counter>缓冲所有元组并计算(重新)尝试的次数.如果计数器小于阈值,则转发输入元组,使其由后面的实际拓扑处理(当然,您需要适当地锚定元组).如果计数大于您的阈值,请确定元组以打破循环并从中删除其条目HashMap(您可能还希望记录所有失败的元组).
  • 为了从中删除成功处理的元组HashMap,每次获取元组时,KafkaSpout需要将元组ID转发给螺栓,以便它可以从元组中删除元组HashMap.只需为您的KafkaSpout子类声明第二个输出流并覆盖Spout.ack(...)(当然,您需要调用super.ack(...)以确保KafkaSpout获取确认).

但这种方法可能会占用大量内存.作为替换为每个元组都有一个条目,HashMap你也可以使用第三个流(与另外两个连接到bolt),如果元组失败(即in Spout.fail(...)),则转发元组ID .每次,螺栓从该第三流接收"失败"消息,计数器增加.只要没有条目HashMap(或未达到阈值),螺栓就会转发元组进行处理.这应该减少使用的内存,但需要在你的喷口和螺栓中实现更多的逻辑.

这两种方法都有缺点,即每个acked元组都会为新引入的螺栓产生额外的消息(从而增加网络流量).对于第二种方法,似乎您只需要向螺栓发送"确认"消息,以获取之前失败的元组.但是,您不知道哪些元组确实失败了哪些元组失败了.如果你想摆脱这种网络开销,你可以引入第二个HashMapKafkaSpout该缓存失败的消息的ID.因此,如果成功重播失败的元组,则只能发送"确认"消息.当然,第三种方法使逻辑实现更加复杂.

如果不KafkaSpout进行某些修改,我认为没有解决方案.我个人补丁KafkaSpout或将使用用第三种方法HashMapKafkaSpout子类和螺栓(因为它消耗很少的内存,并且不投入了大量的额外负载的网络上相比前两种解决方案).