相关疑难解决方法(0)

如何从 ValueTransformer 中的 Punctuator 实例向下游转发事件?

在 KafkaStream 中,在实现ValueTransformerValueTransformerWithKey 时,在调用transform()时,我安排了一个新的 Punctuator。当执行Punctuator 的punctuate()方法时,我希望它使用上下文实例向下游转发事件。然而,当 DSL 拓扑的一部分时,上下文实例似乎没有定义。

关于如何使用 Transformer 执行此操作的任何线索?

在处理器中使用相同的逻辑,实现其工作的低级处理器拓扑。

在 ValueTransformerWithKey 中:

@Override 
    public Event transform(final String key, final Event event) { 
        this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
        return null;
}
Run Code Online (Sandbox Code Playgroud)

在 MyPunctuator 中:

private class MytPunctuator implements Punctuator {
    private String key;
    private ProcessorContext context;
    private Event event;

    MyPunctuator(ProcessorContext context, String key, Event event)
    {
        this.context = context;
        this.key = key;
        this.event = event;
    }

    @Override
    public …
Run Code Online (Sandbox Code Playgroud)

apache-kafka-streams

3
推荐指数
1
解决办法
1190
查看次数

标签 统计

apache-kafka-streams ×1