在我们的一个包含1个喷口和1个螺栓的拓扑结构中 - 我有一种预感,即螺栓正在完成(并且正在进行),但喷嘴仍然失效.
我尝试通过如下的TaskHook来确认这一点 -
public class BaseHook extends BaseTaskHook {
private Logger logger;
private String topology;
private String component;
public BaseHook(String component) {
this.component = component;
}
@Override
public void prepare(Map conf, TopologyContext context) {
logger = LoggerFactory.getLogger(this.getClass());
this.topology = (String) conf.get("topology.name");
}
@Override
public void emit(EmitInfo info) {
log("EMITTED >> Value = " + info.values);
}
@Override
public void spoutAck(SpoutAckInfo info) {
log("ACKED >> Tuple = " + info.messageId + ", Latency = " + info.completeLatencyMs);
}
@Override
public void spoutFail(SpoutFailInfo info) {
log("FAILED >> Tuple = " + info.messageId + ", Latency = " + info.failLatencyMs);
}
@Override
public void boltExecute(BoltExecuteInfo info) {
log("EXECUTED >> Tuple = " + info.tuple.getValues() + ", Latency = " + info.executeLatencyMs);
}
@Override
public void boltAck(BoltAckInfo info) {
log("ACKED >> Tuple = " + info.tuple.getValues() + ", Latency = " + info.processLatencyMs);
}
@Override
public void boltFail(BoltFailInfo info) {
log("FAILED >> Tuple = " + info.tuple.getValues() + ", Latency = " + info.failLatencyMs);
}
private void log(String msg) {
logger.info(">>>>> " + topology + " >> " + component + " >> " + msg);
}
}
Run Code Online (Sandbox Code Playgroud)
原来我的预感是正确的.日志看起来像这样 -
>>>>> TopologyX >> SpoutX >> EMITTED >> Value = [XXXXXXXXX]
>>>>> TopologyX >> BoltX >> ACKED >> Tuple = [XXXXXXXXX], Latency = 1972
>>>>> TopologyX >> BoltX >> EXECUTED >> Tuple = [XXXXXXXXX], Latency = 1973
>>>>> TopologyX >> SpoutX >> FAILED >> Tuple = XXXXXXXXX, Latency = 53913
Run Code Online (Sandbox Code Playgroud)
即.Bolt几乎耗时2秒(To Execute和Ack),但是Spout Fail被调用大约53秒(几乎两倍)topology.message.timeout.secs * 2.
我希望在2-3秒内也可以调用Spout Ack.喷嘴是无阻塞的,螺栓和螺栓都有足够的工作能力.
任何人都有任何暗示可能是什么原因?
所以这里是风暴群看起来像什么 -
T1 = S> B> B> B>确认/失败T2 = S> B>确认/失败T3 = S> B> B>确认/失败T4 =
因此,所讨论的拓扑结构T4即.一个有2个不同的喷口和2个螺栓.其中一个流程通常工作正常(它们具有唯一标识元组的不同messageIds)
这可能是问题吗?
无论如何,
T4.T4T1,但仍然运行良好T2(以及T3其他场合)并T4开始失败现在,
T4甚至可以使用T1和T3.T2或T3启用时,都会T4崩溃.要点 -
T3并且T4都是快速拓扑,即.他们的流量在<100ms完成T3并且T4具有每喷和螺栓仅有1个执行人T3并且T4Max Tuple Pending = 1T3和T4(但是已经尝试过没有限制速度)
所有的Spouts都扩展自BaseSpout类 -
public abstract class BaseSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
context.addTaskHook(new BaseHook(this.getClass().getSimpleName()));
try {
this.collector = collector;
open();
} catch (Exception e) {
throw new RuntimeException("Error when preparing spout", e);
}
}
@Override
public void nextTuple() {
try {
getTuple();
} catch (Throwable t) {
if (!(t instanceof FailedException)) {
t = new FailedException("nextTuple()", t);
}
collector.reportError(t);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String[] fields = getFields();
if (fields != null) {
declarer.declare(new Fields(fields));
}
}
protected void emit(Values values, String msgId) {
collector.emit(values, msgId);
}
protected abstract void open() throws Exception;
protected abstract void getTuple() throws Exception;
protected abstract String[] getFields();
}
Run Code Online (Sandbox Code Playgroud)
所有螺栓都从BaseBolt类扩展 -
public abstract class BaseBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
context.addTaskHook(new BaseHook(this.getClass().getSimpleName()));
try {
this.collector = collector;
prepare();
} catch (Exception e) {
throw new RuntimeException("Error when preparing bolt", e);
}
}
@Override
public void execute(Tuple tuple) {
try {
process(tuple);
collector.ack(tuple);
} catch (Throwable t) {
if (!(t instanceof FailedException)) {
t = new FailedException("execute(" + tuple + ")", t);
}
collector.reportError(t);
collector.fail(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String[] fields = getFields();
if (fields != null) {
declarer.declare(new Fields(fields));
}
}
protected void emit(Tuple tuple, Values values) {
collector.emit(tuple, values);
}
protected abstract void prepare() throws Exception;
protected abstract void process(Tuple tuple) throws Exception;
protected abstract String[] getFields();
}
Run Code Online (Sandbox Code Playgroud)
所以说,没有发出没有messageID(来自spout)或unanchored元组(来自bolt)的元组的可能性
Spout.nextTuple()这里的问题是对和/Spout.ack()或全部的调用Spout.fail()发生在同一个线程上。如果您将大量元组拉入拓扑中,则确认或失败消息最终将等待源 spout 处理,从而导致确认/失败的延迟延长。
你还提到“睡觉”没有效果。如果您的意思是您Thread.sleep()在 spoutsnextTuple()方法中调用了 , ,那么当您停止处理 ack/fails 的线程时,这只会让事情变得更糟。