sob*_*cko 7 apache-kafka apache-kafka-streams
我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 。当发生错误时,通过成功记录错误并继续,它可以正常工作。但是,假设我的传入消息有连续的错误流,我停止并重新启动 kafka 流应用程序,然后我看到失败并已在上次尝试中记录的消息再次重新出现(它们正在被记录)再次)。如果我尝试将错误的消息发送到 DLQ,问题会更大。重新启动时,它们会再次发送到 DLQ。一旦我有一个良好的记录,看起来偏移量会进一步移动,并且在另一次重新启动时不会再次看到已经记录的消息。有没有办法在流应用程序中手动提交?我尝试使用 ProcessorContext#commit(),但这似乎没有任何效果。
我通过运行此处提供的示例重现了此行为:https ://github.com/confluenceinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluence/examples/streams/WordCountLambdaExample .java
我将传入值 Serde 更改为,Serdes.Integer().getClass().getName()以强制输入出现反序列化错误,并将提交间隔减少到仅 1 秒。还将以下内容添加到配置中。
streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);。
一旦失败,当我重新启动应用程序时,之前失败的相同记录会再次出现在日志中。例如,每次重新启动应用程序时,我都会在控制台上看到以下输出。我希望不会再次尝试这些,因为我们之前已经跳过了它们。
2018-01-27 15:24:37,591 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 113
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
2018-01-27 15:24:37,592 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 114
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
看起来当反序列化异常发生时,这个标志永远不会在这里设置为true:https ://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor /internals/StreamTask.java#L228。似乎只有处理成功后,它才成为现实。这可能就是为什么即使在我手动调用processorContext#commit()之后提交也没有发生的原因。
感谢对此问题的任何帮助。
谢谢。
| 归档时间: |
|
| 查看次数: |
5312 次 |
| 最近记录: |