在 KafkaStream 中,在实现ValueTransformer或ValueTransformerWithKey 时,在调用transform()时,我安排了一个新的 Punctuator。当执行Punctuator 的punctuate()方法时,我希望它使用上下文实例向下游转发事件。然而,当 DSL 拓扑的一部分时,上下文实例似乎没有定义。
关于如何使用 Transformer 执行此操作的任何线索?
在处理器中使用相同的逻辑,实现其工作的低级处理器拓扑。
在 ValueTransformerWithKey 中:
@Override
public Event transform(final String key, final Event event) {
this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
return null;
}
Run Code Online (Sandbox Code Playgroud)
在 MyPunctuator 中:
private class MytPunctuator implements Punctuator {
private String key;
private ProcessorContext context;
private Event event;
MyPunctuator(ProcessorContext context, String key, Event event)
{
this.context = context;
this.key = key;
this.event = event;
}
@Override
public …Run Code Online (Sandbox Code Playgroud)