喷嘴没有得到回应

Har*_*ani 6 apache-storm

在我们的一个包含1个喷口和1个螺栓的拓扑结构中 - 我有一种预感,即螺栓正在完成(并且正在进行),但喷嘴仍然失效.

我尝试通过如下的TaskHook来确认这一点 -

public class BaseHook extends BaseTaskHook {

    private Logger logger;
    private String topology;
    private String component;

    public BaseHook(String component) {
        this.component = component;
    }

    @Override
    public void prepare(Map conf, TopologyContext context) {
        logger = LoggerFactory.getLogger(this.getClass());
        this.topology = (String) conf.get("topology.name");
    }

    @Override
    public void emit(EmitInfo info) {
        log("EMITTED >> Value = " + info.values);
    }

    @Override
    public void spoutAck(SpoutAckInfo info) {
        log("ACKED >> Tuple = " + info.messageId + ", Latency = " + info.completeLatencyMs);
    }

    @Override
    public void spoutFail(SpoutFailInfo info) {
        log("FAILED >> Tuple = " + info.messageId + ", Latency = " + info.failLatencyMs);
    }

    @Override
    public void boltExecute(BoltExecuteInfo info) {
        log("EXECUTED >> Tuple = " + info.tuple.getValues() + ", Latency = " + info.executeLatencyMs);
    }

    @Override
    public void boltAck(BoltAckInfo info) {
        log("ACKED >> Tuple = " + info.tuple.getValues() + ", Latency = " + info.processLatencyMs);
    }

    @Override
    public void boltFail(BoltFailInfo info) {
        log("FAILED >> Tuple = " + info.tuple.getValues() + ", Latency = " + info.failLatencyMs);
    }

    private void log(String msg) {
        logger.info(">>>>> " + topology + " >> " + component + " >> " + msg);
    }
}
Run Code Online (Sandbox Code Playgroud)

原来我的预感是正确的.日志看起来像这样 -

>>>>> TopologyX >> SpoutX >> EMITTED >> Value = [XXXXXXXXX]
>>>>> TopologyX >> BoltX >> ACKED >> Tuple = [XXXXXXXXX], Latency = 1972
>>>>> TopologyX >> BoltX >> EXECUTED >> Tuple = [XXXXXXXXX], Latency = 1973
>>>>> TopologyX >> SpoutX >> FAILED >> Tuple = XXXXXXXXX, Latency = 53913
Run Code Online (Sandbox Code Playgroud)

即.Bolt几乎耗时2秒(To Execute和Ack),但是Spout Fail被调用大约53秒(几乎两倍)topology.message.timeout.secs * 2.

我希望在2-3秒内也可以调用Spout Ack.喷嘴是无阻塞的,螺栓和螺栓都有足够的工作能力.

任何人都有任何暗示可能是什么原因?


更新

所以这里是风暴群看起来像什么 -

  • 4个拓扑
    • T1 = S> B> B> B>确认/失败
    • T2 = S> B>确认/失败
    • T3 = S> B> B>确认/失败
    • T4 =
      • S> B>确认/失败
      • S> B>确认/失败

因此,所讨论的拓扑结构T4即.一个有2个不同的喷口和2个螺栓.其中一个流程通常工作正常(它们具有唯一标识元组的不同messageIds)

这可能是问题吗?

无论如何,

  • 我们尝试将执行器减少到我们可以做的任何事情但是没有改进任何东西T4.
  • 我们禁用了所有其他拓扑,并且工作得非常好 T4
  • 我们启用了T1,但仍然运行良好
  • 我们启用T2(以及T3其他场合)并T4开始失败

现在,

  • 在一个随机的场合,T4甚至可以使用T1和T3.
  • 但否则,每次T2T3启用时,都会T4崩溃.

要点 -

  • T3并且T4都是快速拓扑,即.他们的流量在<100ms完成
  • 两者,T3并且T4具有每喷和螺栓仅有1个执行人
  • 两者都有,T3并且T4Max Tuple Pending = 1
  • 我们想要限制两者T3T4(但是已经尝试过没有限制速度)
    • 尝试1:没有任何限制
    • 尝试2:在发射前睡眠50ms
    • 尝试3:发射后睡眠50ms
    • 尝试4:不要睡觉,但只有在距离最后一次发射50秒时发射
    • 没有任何效果

基于评论的附加信息

所有的Spouts都扩展自BaseSpout类 -

public abstract class BaseSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        context.addTaskHook(new BaseHook(this.getClass().getSimpleName()));
        try {
            this.collector = collector;
            open();
        } catch (Exception e) {
            throw new RuntimeException("Error when preparing spout", e);
        }
    }

    @Override
    public void nextTuple() {
        try {
            getTuple();
        } catch (Throwable t) {
            if (!(t instanceof FailedException)) {
                t = new FailedException("nextTuple()", t);
            }
            collector.reportError(t);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String[] fields = getFields();
        if (fields != null) {
            declarer.declare(new Fields(fields));
        }
    }

    protected void emit(Values values, String msgId) {
        collector.emit(values, msgId);
    }

    protected abstract void open() throws Exception;

    protected abstract void getTuple() throws Exception;

    protected abstract String[] getFields();
}
Run Code Online (Sandbox Code Playgroud)

所有螺栓都从BaseBolt类扩展 -

public abstract class BaseBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {   
        context.addTaskHook(new BaseHook(this.getClass().getSimpleName()));
        try {
            this.collector = collector;
            prepare();
        } catch (Exception e) {
            throw new RuntimeException("Error when preparing bolt", e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        try {
            process(tuple);
            collector.ack(tuple);
        } catch (Throwable t) {
            if (!(t instanceof FailedException)) {
                t = new FailedException("execute(" + tuple + ")", t);
            }
            collector.reportError(t);
            collector.fail(tuple);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String[] fields = getFields();
        if (fields != null) {
            declarer.declare(new Fields(fields));
        }
    }

    protected void emit(Tuple tuple, Values values) {
        collector.emit(tuple, values);
    }

    protected abstract void prepare() throws Exception;

    protected abstract void process(Tuple tuple) throws Exception;

    protected abstract String[] getFields();
}
Run Code Online (Sandbox Code Playgroud)

所以说,没有发出没有messageID(来自spout)或unanchored元组(来自bolt)的元组的可能性

Tom*_*per 0

Spout.nextTuple()这里的问题是对和/Spout.ack()或全部的调用Spout.fail()发生在同一个线程上。如果您将大量元组拉入拓扑中,则确认或失败消息最终将等待源 spout 处理,从而导致确认/失败的延迟延长。

你还提到“睡觉”没有效果。如果您的意思是您Thread.sleep()在 spoutsnextTuple()方法中调用了 , ,那么当您停止处理 ack/fails 的线程时,这只会让事情变得更糟。