kle*_*sch 8 apache-kafka apache-storm
我们正在使用Storm与Kafka Spout.当我们失败消息时,我们想重播它们,但在某些情况下,错误的数据或代码错误会导致消息总是失败一个Bolt,所以我们将进入一个无限的重放周期.显然我们在找到错误时会修复错误,但希望我们的拓扑通常具有容错能力.在重放N次以上之后我们怎么能得到一个元组?
通过Kafka Spout的代码,我看到它被设计为使用指数退避计时器和PR状态的注释重试:
"喷口不会终止重试周期(我确信它不应该这样做,因为它不能报告关于中止请求的故障的上下文),它只处理延迟重试.拓扑中的螺栓是仍然期望最终调用ack()而不是fail()来停止循环."
我已经看到StackOverflow响应建议编写自定义喷口,但如果有一种推荐的方法在Bolt中执行此操作,我宁可不要卡住维护Kafka Spout内部的自定义补丁.
什么是在博尔特这样做的正确方法?我没有看到元组中的任何状态暴露了重播的次数.
Storm本身不会为您的问题提供任何支持.因此,定制解决方案是唯一的出路.即使你不想打补丁KafkaSpout
,我认为,引入计数器并打破其中的重播周期,将是最好的方法.作为替代方案,您也可以继承KafkaSpout
并在您的子类中放置一个计数器.这当然有点类似于补丁,但可能不那么具有侵入性并且更容易实现.
如果你想使用Bolt,你可以执行以下操作(这也需要KafkaSpout
对它或它的子类进行一些更改).
KafkaSpout
经由fieldsGrouping
上的ID(以确保该被重放的元组被流传输到相同的螺栓实例).HashMap<ID,Counter>
缓冲所有元组并计算(重新)尝试的次数.如果计数器小于阈值,则转发输入元组,使其由后面的实际拓扑处理(当然,您需要适当地锚定元组).如果计数大于您的阈值,请确定元组以打破循环并从中删除其条目HashMap
(您可能还希望记录所有失败的元组).HashMap
,每次获取元组时,KafkaSpout
需要将元组ID转发给螺栓,以便它可以从元组中删除元组HashMap
.只需为您的KafkaSpout
子类声明第二个输出流并覆盖Spout.ack(...)
(当然,您需要调用super.ack(...)
以确保KafkaSpout
获取确认).但这种方法可能会占用大量内存.作为替换为每个元组都有一个条目,HashMap
你也可以使用第三个流(与另外两个连接到bolt),如果元组失败(即in Spout.fail(...)
),则转发元组ID .每次,螺栓从该第三流接收"失败"消息,计数器增加.只要没有条目HashMap
(或未达到阈值),螺栓就会转发元组进行处理.这应该减少使用的内存,但需要在你的喷口和螺栓中实现更多的逻辑.
这两种方法都有缺点,即每个acked元组都会为新引入的螺栓产生额外的消息(从而增加网络流量).对于第二种方法,似乎您只需要向螺栓发送"确认"消息,以获取之前失败的元组.但是,您不知道哪些元组确实失败了哪些元组失败了.如果你想摆脱这种网络开销,你可以引入第二个HashMap
在KafkaSpout
该缓存失败的消息的ID.因此,如果成功重播失败的元组,则只能发送"确认"消息.当然,第三种方法使逻辑实现更加复杂.
如果不KafkaSpout
进行某些修改,我认为没有解决方案.我个人补丁KafkaSpout
或将使用用第三种方法HashMap
在KafkaSpout
子类和螺栓(因为它消耗很少的内存,并且不投入了大量的额外负载的网络上相比前两种解决方案).
归档时间: |
|
查看次数: |
2374 次 |
最近记录: |