对于传入记录,我需要验证该值,并根据结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用 context.forward() 转发相同的错误。可以使用此链接中提供的 DSL 来完成
使用 kafka-streams 对 json 输入流进行条件排序
我没有在 handlerAPI 中找到明确的方法来执行此操作。
ValidateProcessor.java
@Override
public void process(String key, String value) {
Object result = //validation logic
if(result.isSuccessful()) {
context().forward(key, value);
}else {
context.forward("error",Object)
}
}
Run Code Online (Sandbox Code Playgroud)
现在调用者再次需要检查并根据关键需求来区分接收器主题。我正在使用processorAPI,因为我需要使用标头。
编辑 :
branch(new predicate{
business logic
if(condition)
return true
else
return false;
Run Code Online (Sandbox Code Playgroud)
当条件为假时如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记录。有没有办法在同一个谓词中做?