我有一个基本的流处理流程,看起来像
master topic -> my processing in a mapper/filter -> output topics
Run Code Online (Sandbox Code Playgroud)
我想知道处理"坏消息"的最佳方法.这可能是我无法正确反序列化的消息,或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有这种类型的瞬态错误).
我正在考虑将所有处理/过滤代码包装在try catch中,如果出现异常,则路由到"错误主题".然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master.如果我让任何异常传播,则流似乎被卡住并且不再拾取消息.
为了完整性,这里是我的代码(伪ish):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() { …Run Code Online (Sandbox Code Playgroud)