Sau*_*abh 5 tuples apache-kafka apache-storm
Version Info:
"org.apache.storm" % "storm-core" % "1.2.1"
"org.apache.storm" % "storm-kafka-client" % "1.2.1"
Run Code Online (Sandbox Code Playgroud)
我有一个风暴拓扑,如下所示:
螺栓A -> 螺栓B -> 螺栓C -> 螺栓D
boltA只是对请求进行一些格式化并发出另一个元组。boltB做一些处理并为每个接受的元组发出大约 100 个元组。boltC并boltD处理这些元组。所有的螺栓都执行BaseBasicBolt。
我注意到的是,每当通过 throwing 将boltD某些标记tuple为失败并标记为重试时FailedException,在比拓扑超时少几分钟后,我收到以下错误:
2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
Run Code Online (Sandbox Code Playgroud)
似乎正在发生的事情是当boltB发出 1 个元boltD组中的 100 个并且使这 100 个元组中的一个元组失败时会发生这种情况,我收到此错误。无法理解如何解决这个问题,理想情况下,ack当所有 100 个元组都为 时,它应该是原始元组acked,但可能原始元组acked在所有 100 个元组之前acked,这会导致此错误。
编辑:
我可以使用以下拓扑和两个螺栓来重现它,它在集群模式下运行大约 5 分钟后重现:
螺栓A
case class Abc(index: Int, rand: Boolean)
class BoltA extends BaseBasicBolt {
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val inp = input.getBinaryByField("value").getObj[someObj]
val randomGenerator = new Random()
var i = 0
val rand = randomGenerator.nextBoolean()
1 to 100 foreach {
collector.emit(new Values(Abc(i, rand).getJsonBytes))
i += 1
}
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("boltAout"))
}
}
Run Code Online (Sandbox Code Playgroud)
螺栓B
class BoltB extends BaseBasicBolt {
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val abc = input.getBinaryByField("boltAout").getObj[Abc]
println(s"Received ${abc.index}th tuple in BoltB")
if(abc.index >= 97 && abc.rand){
println(s"throwing FailedException for ${abc.index}th tuple for")
throw new FailedException()
}
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
}
}
Run Code Online (Sandbox Code Playgroud)
卡夫卡喷口:
private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", "queueName")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setOffsetCommitPeriodMs(100)
.setRetry(new KafkaSpoutRetryExponentialBackoff(
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
10,
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
))
.setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", "UNCOMMITTED_EARLIEST")))
.setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", 10000))
.build()
Run Code Online (Sandbox Code Playgroud)
其他配置:
消息超时秒数:300
@Stig Rohde D\xc3\xb8ssing 在此提供了此问题的修复。问题的具体原因已描述如下:
\n\n\n在 STORM-2666 及后续版本的修复中,我们添加了逻辑来处理在后续偏移量已被确认后 spout 收到偏移量确认的情况。问题是 spout 可能会提交所有已确认的偏移量,但不会向前调整消费者位置,或正确清除 waitingToEmit。如果确认的偏移量远远落后于日志结束偏移量,则 spout 可能最终会轮询其已提交的偏移量。
\n修复略有错误。当消费者位置落后于提交的偏移量时,我们确保向前调整位置,并清除提交的偏移量后面的所有 waitingToEmit 消息。除非我们调整消费者的位置,否则我们不会清除 waitingToEmit,这结果是一个问题。
\n例如,假设偏移量 1 失败,偏移量 2-10 已被确认,maxPollRecords 为 10。假设 Kafka 中有 11 条记录 (1-11)。如果 spout 寻求返回偏移量 1 来重播它,它将从轮询中的消费者那里获取偏移量 1-10。消费者位置现在是 11。spout 发出偏移量 1。假设它立即被确认。在下一次轮询中,spout 将提交偏移量 1-10 并检查是否应该调整消费者位置和 waitingToEmit。由于位置 (11) 位于提交的偏移量 (10) 之前,因此它不会清除 waitingToEmit。由于 waitingToEmit 仍包含上次轮询的偏移量 2-10,因此 spout 最终将再次发出这些元组。
\n
人们可以在这里看到修复方法。
\n| 归档时间: |
|
| 查看次数: |
728 次 |
| 最近记录: |