Thi*_*iru 4 java apache-kafka-streams spring-kafka
曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。
我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等,
setUncaughtExceptionHandler吗?或者,还有更好的方法?提前致谢!!
这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异常(例如,由于网络故障或kafka代理已死),则流将默认失效。在kafka-streams 1.1.0版中,您可以通过ProductionExceptionHandler以下方式实现覆盖默认行为:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]", new String(record.value()), record.topic(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
Run Code Online (Sandbox Code Playgroud)
从句柄方法,CONTINUE如果您不希望流因异常而死,则可以返回,或者在返回FAIL时以防流停止(FAIL是默认值之一)。并且您需要在流配置中指定此类:
default.production.exception.handler=com.example.CustomProductionExceptionHandler
Run Code Online (Sandbox Code Playgroud)
另外要注意的是ProductionExceptionHandler手柄只在生产异常,以及与流方法处理邮件时不会处理异常mapValues(..),filter(..),branch(..)等你需要用与try / catch块,这些方法的逻辑(把所有的方法逻辑到try块来保证您将处理所有例外情况):
.filter((key, value) -> { try {..} catch (Exception e) {..} })
Run Code Online (Sandbox Code Playgroud)
据我所知,我们不需要在用户端显式处理异常,因为kafka流稍后会自动重试(因为偏移量直到消息被使用和处理后才会更改);例如,如果kafka代理在一段时间内无法访问,您将从kafka流中获取异常,并且当断开连接时,kafka流将消耗所有消息。因此在这种情况下,我们将只是延迟而没有损坏/丢失。
使用,setUncaughtExceptionHandler您将无法更改默认行为,例如使用ProductionExceptionHandler,则只能记录错误或将消息发送到失败主题。
对于消费者端的异常处理,
1) 您可以使用以下属性在生产者中添加默认异常处理程序。
"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";
Run Code Online (Sandbox Code Playgroud)
基本上 apache 提供了三个异常处理程序类:
1)LogAndContiuneExceptionHandler,您可以将其视为
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
Run Code Online (Sandbox Code Playgroud)
2)LogAndFailExceptionHandler
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
Run Code Online (Sandbox Code Playgroud)
3)LogAndSkipOnInvalidTimestamp
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndSkipOnInvalidTimestamp.class);
Run Code Online (Sandbox Code Playgroud)
对于自定义异常处理,
1)您可以实现DeserializationExceptionHandler接口并重写handle()方法。
2)或者您可以扩展上述课程。
| 归档时间: |
|
| 查看次数: |
4243 次 |
| 最近记录: |