Kafka Streams 处理器 API context.forward

vin*_*dhi 7 java apache-kafka apache-kafka-streams

对于传入记录,我需要验证该值,并根据结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用 context.forward() 转发相同的错误。可以使用此链接中提供的 DSL 来完成

使用 kafka-streams 对 json 输入流进行条件排序

我没有在 handlerAPI 中找到明确的方法来执行此操作。

    ValidateProcessor.java

    @Override
    public void process(String key, String value) {
        Object result = //validation logic
        if(result.isSuccessful()) {
            context().forward(key, value);
         }else {
            context.forward("error",Object)
        }

}
Run Code Online (Sandbox Code Playgroud)

现在调用者再次需要检查并根据关键需求来区分接收器主题。我正在使用processorAPI,因为我需要使用标头。

编辑 :

branch(new predicate{
 business logic 
 if(condition)
   return true
 else
   return false;
Run Code Online (Sandbox Code Playgroud)

当条件为假时如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记录。有没有办法在同一个谓词中做?

Mat*_*Sax 9

当您指定 时Topology,您可以为所有节点分配名称并连接它们:

Topology topology = new Topology();
topology.addSource("source", ...);
topology.addProcessor("X", ..., "source"); // connect source->X
topology.addSink("Y", ..., "X"); // connect X->Y
topology.addSink("Z", ..., "X"); // connect X->Z
Run Code Online (Sandbox Code Playgroud)

如果处理器“X”连接到下游处理器“Y”和“Z”,则可以使用节点名称将记录发送到“Y”或“Z”。如果您不指定名称,记录将发送到所有下游(“子”)处理器。

// this is `process()` of "X"
public void process(String key, String value) {
    context.forward(newKey, newValue); // send to both Y and Z
    context.forward(newKey, newValue, To.child("Y")); // send it only to Y
    context.forward(newKey, newValue, To.child("Z")); // send it only to Z
}
Run Code Online (Sandbox Code Playgroud)