bm1*_*729 31 error-handling apache-kafka apache-kafka-streams
我有一个基本的流处理流程,看起来像
master topic -> my processing in a mapper/filter -> output topics
Run Code Online (Sandbox Code Playgroud)
我想知道处理"坏消息"的最佳方法.这可能是我无法正确反序列化的消息,或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有这种类型的瞬态错误).
我正在考虑将所有处理/过滤代码包装在try catch中,如果出现异常,则路由到"错误主题".然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master.如果我让任何异常传播,则流似乎被卡住并且不再拾取消息.
为了完整性,这里是我的代码(伪ish):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() {
@Override
public AnalysedDocument apply(String rawValue) {
Document document;
try {
// Deserialise
document = ...
} catch (Exception e) {
return new AnalysedDocument(rawValue, exception);
}
try {
// Perform analysis
Analysis analysis = ...
return new AnalysedDocument(document, analysis);
} catch (Exception e) {
return new AnalysedDocument(document, exception);
}
}
});
// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Run Code Online (Sandbox Code Playgroud)
任何帮助非常感谢.
Mat*_*Sax 28
目前,Kafka Streams仅提供有限的错误处理功能.正在进行的工作是为了简化这一过程.目前,您的整体方法似乎是一个很好的方法.
关于处理de/serialization错误的一条评论:手动处理这些错误,需要您"手动"进行de/serialization.这意味着,您需要为ByteArraySerdeStreams应用程序的输入/输出主题配置密钥和值,并添加一个map()执行de/serialization(即,KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType>如果您还想捕获序列化异常,则反之亦然) .否则,您无法try-catch反序列化异常.
使用您当前的方法,您"仅"验证给定字符串表示有效文档 - 但情况可能是,消息本身已损坏且无法String在源操作符中首先转换为a .因此,您实际上并未使用代码覆盖反序列化异常.但是,如果您确定反序列化异常永远不会发生,那么您的方法也是足够的.
更新
这个问题通过KIP-161解决,并将包含在下一个版本1.0.0中.它允许您通过参数注册回调default.deserialization.exception.handler.每次在反序列化期间发生异常时都会调用该处理程序,并允许您返回DeserializationResponse(CONTINUE- >删除记录,或者FAIL这是默认值).
更新2
使用KIP-210(将成为Kafka 1.1的一部分),通过注册可以返回的ProductionExceptionHandlervia配置default.production.exception.handler,也可以处理生产者方面的错误,类似于消费者部分CONTINUE.
Mic*_*oll 27
更新于2018年3月23日: Kafka 1.0提供了更好,更容易处理错误消息("毒丸")通过KIP-161比我下面描述的.请参阅Kafka 1.0文档中的default.deserialization.exception.handler.
这可能是我无法正确反序列化的消息[...]
好的,我的答案主要关注(de)序列化问题,因为这可能是大多数用户处理的最棘手的场景.
[...]或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖性,所以不应该有那种瞬态错误).
同样的思考(用于反序列化)也可以应用于处理逻辑中的失败.在这里,大多数人倾向于倾向于下面的选项2(减去反序列化部分),但是YMMV.
我正在考虑将所有处理/过滤代码包装在try catch中,如果出现异常,则路由到"错误主题".然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master.如果我让任何异常传播,则流似乎被卡住并且不再拾取消息.
- 这种方法被认为是最佳做法吗?
是的,目前这是要走的路.本质上,两种最常见的模式是(1)跳过损坏的消息或(2)将损坏的记录发送到隔离主题,即死信队列.
- 有没有方便的Kafka溪流来处理这个问题?我不认为有DLQ的概念......
是的,有办法处理这个问题,包括使用死信队列.然而,它(至少恕我直言)还不方便.如果您对API如何允许您处理此问题有任何反馈 - 例如通过新的或更新的方法,配置设置("如果序列化/反序列化失败,则将有问题的记录发送到此隔离主题") - 请让我们知道.:-)
- 什么是阻止卡夫卡干扰"坏消息"的替代方法?
- 有哪些替代错误处理方法?
请参阅下面的示例.
FWIW,Kafka社区还在讨论添加一个新的CLI工具,允许您跳过已损坏的消息.但是,作为Kafka Streams API的用户,我认为您希望直接在代码中处理此类方案,并且仅作为最后的手段回退到CLI实用程序.
以下是Kafka Streams DSL处理损坏的记录/消息(称为"毒丸")的一些模式.这取自http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages
选项1:使用跳过损坏的记录 flatMap
这可以说是大多数用户想要做的事情.
flatMap它是因为它允许您为每个输入记录输出零个,一个或多个输出记录.在记录损坏的情况下,我们不输出任何内容(零记录),从而忽略/跳过损坏的记录.flatMap"标记"潜在数据重新分区的输入流,即如果您执行基于密钥的操作(如分组(groupBy/ groupByKey)或之后的连接),您的数据将在后台重新分区.由于这可能是一个代价高昂的步骤,我们不希望这种情况不必要地发生.如果您知道记录键始终有效或者您不需要对键进行操作(从而将它们保留为byte[]格式的"原始"键),则可以从更改flatMap为flatMapValues,这不会导致数据重新分区即使您稍后加入/分组/聚合流.代码示例:
Serde<byte[]> bytesSerde = Serdes.ByteArray();
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
// Input topic, which might contain corrupted messages
KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, inputTopic);
// Note how the returned stream is of type KStream<String, Long>,
// rather than KStream<byte[], byte[]>.
KStream<String, Long> doubled = input.flatMap(
(k, v) -> {
try {
// Attempt deserialization
String key = stringSerde.deserializer().deserialize(inputTopic, k);
long value = longSerde.deserializer().deserialize(inputTopic, v);
// Ok, the record is valid (not corrupted). Let's take the
// opportunity to also process the record in some way so that
// we haven't paid the deserialization cost just for "poison pill"
// checking.
return Collections.singletonList(KeyValue.pair(key, 2 * value));
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return Collections.emptyList();
}
);
Run Code Online (Sandbox Code Playgroud)
选项2:死信队列 branch
与选项1(忽略损坏的记录)相比,选项2通过将它们从"主"输入流中过滤掉并将它们写入隔离主题(想想:死信队列)来保留损坏的消息.缺点是,对于有效记录,我们必须支付两次手动反序列化费用.
KStream<byte[], byte[]> input = ...;
KStream<byte[], byte[]>[] partitioned = input.branch(
(k, v) -> {
boolean isValidRecord = false;
try {
stringSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException ignored) {}
return isValidRecord;
},
(k, v) -> true
);
// partitioned[0] is the KStream<byte[], byte[]> that contains
// only valid records. partitioned[1] contains only corrupted
// records and thus acts as a "dead letter queue".
KStream<String, Long> doubled = partitioned[0].map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
// Don't forget to actually write the dead letter queue back to Kafka!
partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");
Run Code Online (Sandbox Code Playgroud)
选项3:使用跳过损坏的记录 filter
我只提到这个是完整的.此选项看起来像是选项1和2的混合,但比其中任何一个都差.与选项1相比,您必须为有效记录支付手动反序列化费用两次(不好!).与选项2相比,您将无法在死信队列中保留损坏的记录.
KStream<byte[], byte[]> validRecordsOnly = input.filter(
(k, v) -> {
boolean isValidRecord = false;
try {
bytesSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return isValidRecord;
}
);
KStream<String, Long> doubled = validRecordsOnly.map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
Run Code Online (Sandbox Code Playgroud)
任何帮助非常感谢.
我希望我能帮忙.如果是,我将非常感谢您就如何改进Kafka Streams API以便以比现在更好/更方便的方式处理故障/异常的反馈.:-)
| 归档时间: |
|
| 查看次数: |
16423 次 |
| 最近记录: |