Shr*_*arg 1 apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka
您能否建议一下,当控件到达 catch 块时,如何停止发送到我的第三个 kafka 主题,当前消息被发送到错误主题以及正常处理时应发送到的主题。一段代码如下:
@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}
Run Code Online (Sandbox Code Playgroud)
这可以使用以下模式来实现:
KafkaClass stream;
return input -> input
.branch((k, v) -> {
try {
stream = processFunction();
return true;
}
catch (Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
return false;
}
},
(k, v) -> true)[0]
.map((k, v) -> new KeyValue<>(k, stream));
Run Code Online (Sandbox Code Playgroud)
在这里,我们使用 的分支功能 (API) 将KStream您的输入分成两个路径 - 正常流和导致错误的路径。这是通过为方法调用提供两个过滤器来实现的branch。第一个过滤器是调用方法processFunction并获取响应的正常流程。如果没有出现异常,过滤器将返回true,并且分支操作的结果将被捕获在输出数组的第一个元素中,[0]该元素将在map将最终结果发送到出站主题的操作中进行下游处理。
另一方面,如果它抛出异常,它会使用 发送任何必要的内容到错误主题,StreamBridge并且过滤器返回false。由于下游map操作仅对来自branching 的数组的第一个元素执行[0],因此不会发送任何内容。当第一个过滤器返回时false,它会转到总是返回的第二个过滤器true。这是一个无操作过滤器,其结果被完全忽略。
这种特定实现的一个缺点是,您需要将响应存储processFunction在实例字段中,然后对每个传入KStream记录进行变异,以便您可以在map发送输出的最终方法中访问其值。然而,对于这个特定的用例,这可能不是问题。