如何从 ValueTransformer 中的 Punctuator 实例向下游转发事件?

Aur*_*ien 3 apache-kafka-streams

在 KafkaStream 中,在实现ValueTransformerValueTransformerWithKey 时,在调用transform()时,我安排了一个新的 Punctuator。当执行Punctuator 的punctuate()方法时,我希望它使用上下文实例向下游转发事件。然而,当 DSL 拓扑的一部分时,上下文实例似乎没有定义。

关于如何使用 Transformer 执行此操作的任何线索?

在处理器中使用相同的逻辑,实现其工作的低级处理器拓扑。

在 ValueTransformerWithKey 中:

@Override 
    public Event transform(final String key, final Event event) { 
        this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
        return null;
}
Run Code Online (Sandbox Code Playgroud)

在 MyPunctuator 中:

private class MytPunctuator implements Punctuator {
    private String key;
    private ProcessorContext context;
    private Event event;

    MyPunctuator(ProcessorContext context, String key, Event event)
    {
        this.context = context;
        this.key = key;
        this.event = event;
    }

    @Override
    public void punctuate(final long timestamp) {
        context.forward(key, AlertEvent.builder().withSource(event).build());
        context.commit();
    }
}
Run Code Online (Sandbox Code Playgroud)

执行时

myStream
    .groupByKey(Serialized.with(Serdes.String(), Event.serde()))
    .reduce((k, v) -> v)
    .transformValues(() -> valueTransformerWithKey)
    .toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));
Run Code Online (Sandbox Code Playgroud)

我希望标点符号产生的 Alarm 事件一旦过期就会被转发到 ALARM 主题。

相反,我得到了以下异常:不支持 ProcessorContext.forward()。

Aur*_*ien 6

像往常一样,我在关于ValueTransformerWithKey接口的 javadoc 中找到了答案:https : //kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

请注意,在转换中不允许使用 ProcessorContext.forward(Object, Object) 或 ProcessorContext.forward(Object, Object, To) 并且会导致异常。

但是,实现Transformer接口允许使用context.forward()。谢谢@Matthias J. Sax

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html

如果应该向下游转发多个输出记录,则可以使用 ProcessorContext.forward(Object, Object) 和 ProcessorContext.forward(Object, Object, To)。如果不应该向下游转发记录,则转换可以返回 null。

  • 不幸的是,您无法避免下游重新分区 atm —— 这是 API 的一个缺口。 (2认同)