相关疑难解决方法(0)

Kafka KStream - 使用带窗口的AbstractProcessor

我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储.

我期待.punctuate()大约每30秒看一次.我得到的是保存在这里.

(原始文件长达数千行)

总结 - .punctuate()看似随机然后反复调用.它似乎不符合通过ProcessorContext.schedule()设置的值.


编辑:

另一个相同代码的运行.punctuate()大约每四分钟产生一次调用.这次我没有看到疯狂的重复值.来源没有变化 - 只是结果不同.

使用以下代码:

主要

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();
Run Code Online (Sandbox Code Playgroud)

处理器

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay; …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1449
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1

java ×1