小编vin*_*dhi的帖子

Kafka Streams 处理器 API context.forward

对于传入记录,我需要验证该值,并根据结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用 context.forward() 转发相同的错误。可以使用此链接中提供的 DSL 来完成

使用 kafka-streams 对 json 输入流进行条件排序

我没有在 handlerAPI 中找到明确的方法来执行此操作。

    ValidateProcessor.java

    @Override
    public void process(String key, String value) {
        Object result = //validation logic
        if(result.isSuccessful()) {
            context().forward(key, value);
         }else {
            context.forward("error",Object)
        }

}
Run Code Online (Sandbox Code Playgroud)

现在调用者再次需要检查并根据关键需求来区分接收器主题。我正在使用processorAPI,因为我需要使用标头。

编辑 :

branch(new predicate{
 business logic 
 if(condition)
   return true
 else
   return false;
Run Code Online (Sandbox Code Playgroud)

当条件为假时如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记录。有没有办法在同一个谓词中做?

java apache-kafka apache-kafka-streams

7
推荐指数
1
解决办法
5063
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1

java ×1