在Kafka流中处理异常

Thi*_*iru 4 java apache-kafka-streams spring-kafka

曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。

我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等,

  • 有人可以建议正确的做法吗?我应该使用 setUncaughtExceptionHandler吗?或者,还有更好的方法?
  • 如何处理重试?

提前致谢!!

Vas*_*kyi 7

这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异常(例如,由于网络故障或kafka代理已死),则流将默认失效。在kafka-streams 1.1.0版中,您可以通过ProductionExceptionHandler以下方式实现覆盖默认行为:

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

    @Override
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]",  new String(record.value()), record.topic(), exception);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
    }

}
Run Code Online (Sandbox Code Playgroud)

从句柄方法,CONTINUE如果您不希望流因异常而死,则可以返回,或者在返回FAIL时以防流停止(FAIL是默认值之一)。并且您需要在流配置中指定此类:

default.production.exception.handler=com.example.CustomProductionExceptionHandler
Run Code Online (Sandbox Code Playgroud)

另外要注意的是ProductionExceptionHandler手柄只在生产异常,以及与流方法处理邮件时不会处理异常mapValues(..)filter(..)branch(..)等你需要用与try / catch块,这些方法的逻辑(把所有的方法逻辑到try块来保证您将处理所有例外情况):

.filter((key, value) -> { try {..} catch (Exception e) {..} })
Run Code Online (Sandbox Code Playgroud)

据我所知,我们不需要在用户端显式处理异常,因为kafka流稍后会自动重试(因为偏移量直到消息被使用和处理后才会更改);例如,如果kafka代理在一段时间内无法访问,您将从kafka流中获取异常,并且当断开连接时,kafka流将消耗所有消息。因此在这种情况下,我们将只是延迟而没有损坏/丢失。

使用,setUncaughtExceptionHandler您将无法更改默认行为,例如使用ProductionExceptionHandler,则只能记录错误或将消息发送到失败主题。

  • 如果在处理消息期间(例如在.mapValues(..)、. filter(..)`中的逻辑期间)或在生成到目标(接收器)消息的消息期间发生异常(例如通过网络问题),则您的异常流将死。您可以通过在处理器期间引发任何异常来对其进行测试。所有未捕获的异常都将导致kafka流死亡 (2认同)

Vis*_*war 6

对于消费者端的异常处理,

1) 您可以使用以下属性在生产者中添加默认异常处理程序。

"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";
Run Code Online (Sandbox Code Playgroud)

基本上 apache 提供了三个异常处理程序类:

1)LogAndContiuneExceptionHandler,您可以将其视为

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndContinueExceptionHandler.class);
Run Code Online (Sandbox Code Playgroud)

2)LogAndFailExceptionHandler

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndFailExceptionHandler.class);
Run Code Online (Sandbox Code Playgroud)

3)LogAndSkipOnInvalidTimestamp

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndSkipOnInvalidTimestamp.class);
Run Code Online (Sandbox Code Playgroud)

对于自定义异常处理,

1)您可以实现DeserializationExceptionHandler接口并重写handle()方法。

2)或者您可以扩展上述课程。