我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储.
我期待.punctuate()大约每30秒看一次.我得到的是保存在这里.
(原始文件长达数千行)
总结 - .punctuate()看似随机然后反复调用.它似乎不符合通过ProcessorContext.schedule()设置的值.
另一个相同代码的运行.punctuate()大约每四分钟产生一次调用.这次我没有看到疯狂的重复值.来源没有变化 - 只是结果不同.
使用以下代码:
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
Run Code Online (Sandbox Code Playgroud)
public class BP2 extends AbstractProcessor<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);
private ProcessorContext context;
private final long delay;
private final ArrayList<String> values;
public BP2(long delay) {
LOGGER.debug("BatchProcessor() constructor");
this.delay = delay; …Run Code Online (Sandbox Code Playgroud)