风暴批次后向卡夫卡提交补偿

Sim*_*imY 7 apache-kafka apache-storm streamparse

当批量螺栓完成批处理时,提交每个分区的最高偏移量的正确方法是什么?我主要担心的是机器在处理批次时死亡,因为整个shebang将在AWS现场实例中运行.

我是暴风雨开发的新手我似乎无法找到IMO的答案是非常直接的使用kafka和风暴.

场景:

基于保证消息处理指南,假设我有一个蒸汽(kafka主题)的("word",count)元组,批处理螺栓,处理X tupples,做一些聚合并创建CSV文件,上传文件到hdfs/db和acks.

在非strom"天真"的实现中,我会读取X msgs(或读取Y秒),聚合,写入hdfs,一旦上传完成,将每个分区的最新(最高)偏移量提交给kafka.如果机器或进程在db提交之前死亡 - 下一次迭代将从前一个地方开始.

在风暴中,我可以创建批处理螺栓,它将锚定所有批处理元组并立即执行它们,但是我找不到将每个分区的最高偏移量提交到kafka的方法,因为spout不知道批处理,所以一旦批量螺栓响应了tupples,每个spout实例都会逐一响应他的tupples,所以我就像我看到的那样:

  1. 在喷口上的每个ack上提交已确认消息的偏移量.这将导致许多提交(每批次可能是几千个tupples),可能是乱序,如果在提交偏移时喷口工作死亡,我将最终部分重放一些事件.
  2. 与1.相同但我可以在最高偏移量中添加一些局部偏移量管理(修复无序偏移提交)并提交每隔几秒钟看到的高位偏移(减少大量提交)但我仍然可以部分结束如果喷口死亡,则会产生偏移
  3. 将偏移子目标逻辑移动到螺栓 - 我可以将每个消息的分区和偏移量添加到发送到批处理螺栓的数据中,并将每个分区的最高过程偏移量作为批次的一部分提交(发送到"偏移提交者"螺栓在批次结束).这将解决偏移跟踪,多次提交和空间重播问题,但这会为螺栓添加kafka特定逻辑,从而将螺栓代码与kafka复制,一般而言,在我看来,重新发明轮子.
  4. 通过车轮改造更进一步,并在ZK中手动管理最高处理的修补 - 偏移组合,并在启动喷口时读取此值.

Den*_*din 0

您的问题有很多,所以不确定这是否完全解决了这个问题,但是如果您担心发送到 kafka 的确认数量(例如在每条消息之后),您应该能够设置消费的批量大小,例如例如 1000 就可以减少很多。