Nic*_*las 2 apache-kafka apache-kafka-streams
使用kafka-streams 0.10.0.0,转发消息时,我会定期在StreamTask中看到空指针异常。它在调用的10%到50%之间变化。NPE发生在这种方法中:
public <K, V> void forward(K key, V value) {
ProcessorNode thisNode = currNode;
try {
for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
currNode = childNode;
childNode.process(key, value);
}
} finally {
currNode = thisNode;
}
}
Run Code Online (Sandbox Code Playgroud)
似乎在某些情况下,thisNode字段为null。知道是什么原因造成的吗?堆栈跟踪如下。
[ERROR] 2016-08-21 14:50:39.288 [StreamThread-1] StreamedMetricMeter - Forwarding failed
java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) ~[kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) ~[kafka-streams-0.10.0.0.jar:?]
at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.forward(AbstractStreamedMetricProcessor.java:552) [classes/:?]
at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:89) [classes/:?]
at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:1) [classes/:?]
at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.process(AbstractStreamedMetricProcessor.java:166) [classes/:?]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) [kafka-streams-0.10.0.0.jar:?]
Run Code Online (Sandbox Code Playgroud)
问题是,我ProcessorSupplier小号是为每次调用返回处理器的同一个实例获得。反过来,Kafka Streams引擎正在尝试创建多个处理器实例,我无疑会创建多处理器垃圾箱。注意类似的粗心。。。ProcessorSupplier.get()应该在每次调用时返回一个新的处理器实例。
| 归档时间: |
|
| 查看次数: |
569 次 |
| 最近记录: |