在一系列螺栓中在Storm中确认的正确方法

Adr*_*ian 24 distributed-computing apache-storm

只是想确保我知道Ack-ing在Storm中是如何工作的.我有1个喷口和2个螺栓链在一起.Spout向Bolt1发出元组,而Bolt1又向Bolt 2发出一个元组.我希望Bolt 2能够从Spout发出的初始元组,我不确定如何.

为了保证容错(即:重新发送元组),我想在螺栓2中确认Spout发出的元组,以防万一它在过程中的某个地方失败,因此可以重新发送.

考虑这个例子:

喷口:

 _collector.emit(new Values(queue.dequeue())
Run Code Online (Sandbox Code Playgroud)

Bolt1:

def execute(tuple: Tuple) {
 _collector.emit(tuple, new Values("stuff"))
}
Run Code Online (Sandbox Code Playgroud)

在这一点上,元组是由喷口发送的元组.我可以在这里说它没有问题.现在添加另一个监听Bolt1发出的元组的螺栓.

Bolt2:

def execute(tuple2: Tuple) {
 _collector.emit(tuple2, new Values("foo"))
}
Run Code Online (Sandbox Code Playgroud)

此时tuple2中的元组是从Bolt1发送的元组(其中包含字符串"stuff"的元组).
因此,如果我在Bolt2中发送一个ack,这将从Bolt1中获取元组,而不是从Spout发送的元组.正确?

我如何识别从喷口发出的元组?我应该把所有其他喷口上的初始喷口扛回来,这样我就可以在最后一个螺栓上找回它并确认它吗?

我阅读了Nathan的教程,我得到的印象是,在发出tuple2后,我可以在那里收到Bolt1(来自Spout)收到的元组.这会将新发出的tuple2链接到Spout发送的原始元组,所以当Bolt2确认元组2时,它实际上会从Spout中获取原始元组.这是真的?

如果我在解释中遗漏了某些内容,请告诉我.

Adr*_*ian 28

对于那些感兴趣的人,我通过询问风暴组找到了解决方案.我需要的是在Spout中以下列方式发出元组(使用唯一ID):

喷口:

 //ties in tuple to this UID
 _collector.emit(new Values(queue.dequeue(), *uniqueID*) 
Run Code Online (Sandbox Code Playgroud)

然后Bolt1只有在它发送到Bolt2之后才会响应元组

Bolt1:

 //emit first then ack
 _collector.emit(tuple, new Values("stuff")) //**anchoring** - read below to see what it means
 _collector.ack(tuple) 
Run Code Online (Sandbox Code Playgroud)

此时来自Spout的元组已经在Bolt1中被激活,但同时新发出的元组"东西"到Bolt2被"锚定"到Spout的元组.这意味着它仍然需要稍后被激活,否则在超时它将被喷口重新发送.

Bolt2:

 _collector.ack(tuple) 
Run Code Online (Sandbox Code Playgroud)

Bolt2需要确认从Bolt1收到的元组,它将发送Spout等待的最后一个ack.如果此时Bolt2发出元组,那么必须有一个Bolt3来获取它并确认它.如果元组在最后一点没有被激活,Spout会将其计时并重新发送.

每次锚定都是在一个emit语句中完成的,一个"树"结构中的一个新节点被构建......更像是我的情况下的列表,因为我从来没有将相同的元组发送到2个或更多元组,我有一对一的关系.

需要对树中的所有节点进行确认,然后才将元组标记为完全到达.如果元组没有被激活并且它随后被UID发送并且稍后被锚定,那么它将被永久保存在存储器中(直到被激活).

希望这可以帮助.


Vor*_*Vor 0

你需要anchor元组。看看Guaranteing-message-processing 尤其是你需要这个:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
Run Code Online (Sandbox Code Playgroud)