相关疑难解决方法(0)

使用Kafka的Streams API处理错误消息

我有一个基本的流处理流程,看起来像

master topic -> my processing in a mapper/filter -> output topics
Run Code Online (Sandbox Code Playgroud)

我想知道处理"坏消息"的最佳方法.这可能是我无法正确反序列化的消息,或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有这种类型的瞬态错误).

我正在考虑将所有处理/过滤代码包装在try catch中,如果出现异常,则路由到"错误主题".然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master.如果我让任何异常传播,则流似乎被卡住并且不再拾取消息.

  • 这种方法被认为是最佳做法吗?
  • 有没有方便的Kafka溪流来处理这个问题?我不认为有DLQ的概念......
  • 什么是阻止卡夫卡干扰"坏消息"的替代方法?
  • 有哪些替代错误处理方法?

为了完整性,这里是我的代码(伪ish):

class Document {
    // Fields
}

class AnalysedDocument {

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) {...}

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) {...}

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) {...}
}

KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() { …
Run Code Online (Sandbox Code Playgroud)

error-handling apache-kafka apache-kafka-streams

31
推荐指数
2
解决办法
2万
查看次数