我为什么不在Spout.nextTuple()中循环或阻塞

Mat*_*Sax 5 apache-storm

我看到很多代码片段里面都使用了一个循环Spout.nextTuple()(例如读取整个文件并为每一行发出一个元组):

public void nextTuple() {
    // do other stuff here

    // reader might be BufferedReader that is initialized in open()
    String str;
    while((str = reader.readLine()) != null) {
        _collector.emit(new Values(str));
    }

    // do some more stuff here
}
Run Code Online (Sandbox Code Playgroud)

这段代码似乎是直截了当的,然而,我被告知应该循环内部nextTuple().问题是为什么?

Mat*_*Sax 6

执行Spout时,它在单个线程中运行.这个线程"永远"循环,并有多个职责:

  1. 呼叫 Spout.nextTuple()
  2. 检索"确认"并处理它们
  3. 检索"失败"并处理它们
  4. 超时元组

为了实现这一点,必须保持"永远"(即循环或阻塞),nextTuple()但在向系统发出元组后返回(或者如果没有发出元组,则返回,但不阻止) .否则,Spout无法正常工作.nextTuple()将由Storm调用.因此,在处理了ack/fail消息等之后,下一次调用将nextTuple()很快发生.

因此,在一次调用中发出多个元组也被认为是不好的做法nextTuple().只要代码保持不变nextTuple(),spout线程就不能(例如)对传入的ack作出反应.这可能会导致不必要的超时,因为无法及时处理.

最佳做法是为每次调用发出一个元组nextTuple().如果没有可以发出的元组,你应该返回(不发出)而不是等到元组可用.