Kafka KStream - 使用带窗口的AbstractProcessor

eth*_*nny 1 java apache-kafka apache-kafka-streams

我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储.

我期待.punctuate()大约每30秒看一次.我得到的是保存在这里.

(原始文件长达数千行)

总结 - .punctuate()看似随机然后反复调用.它似乎不符合通过ProcessorContext.schedule()设置的值.


编辑:

另一个相同代码的运行.punctuate()大约每四分钟产生一次调用.这次我没有看到疯狂的重复值.来源没有变化 - 只是结果不同.

使用以下代码:

主要

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();
Run Code Online (Sandbox Code Playgroud)

处理器

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay;

       values = new ArrayList<>();

    }

    @Override
    public void process(String s, String s2) {
        LOGGER.debug("batched processor s:{}   s2:{}", s, s2);

        values.add(s2);
    }

    @Override
    public void init(ProcessorContext context) {
        LOGGER.info("init");

        super.init(context);

        values.clear();

        this.context = context;
        context.schedule(delay);
    }

    @Override
    public void punctuate(long timestamp) {
        super.punctuate(timestamp);

        LOGGER.info("punctuate   ts: {}   count: {}", timestamp, values.size());

        context().commit();
    }
}
Run Code Online (Sandbox Code Playgroud)

ProcessorSupplier

public class BPS2 implements ProcessorSupplier<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);

    @Override
    public Processor<String, String> get() {
        try {
            return new BP2(30000);
        } catch(Exception exception) {
            LOGGER.error("Unable to instantiate BatchProcessor()", exception);
            throw new RuntimeException();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑:

为了确保我的调试器没有减慢速度,我构建它并在与我的kafka进程相同的盒子上运行它.这次它甚至没有试图延迟4分钟或更长时间 - 在几秒钟内就输出了虚假的电话.punctuate().其中许多(大多数)没有干预.process().

Mat*_*Sax 6

更新:这部分答案是针对Kafka版本0.11或更早版本(对于Kafka 1.0及更高版本,见下文)

在卡夫卡流,标点符号是基于数据流的时间 系统时间(又名处理时间).

每个默认流时间事件时间,即嵌入在Kafka记录中的时间戳.当你不设置非默认TimestampExtractor(参见timestamp.extractorhttp://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters),来电来punctuate只取决于事件的过程关于您处理的记录的时间.因此,如果您需要多个分钟来处理记录的"30秒"(事件时间),punctuate则会比30秒(挂钟时间)更频繁地调用...

这也可以解释您的不规则呼叫模式(即突发和长延迟).如果您的数据事件时间"跳转",并且您的主题中已经完全可以处理您要处理的数据,那么Kafka Streams也会"跳转"内部维护的流时间.

我认为,您可以通过使用来解决您的问题WallclockTimestampExtractor(请参阅http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor)

还有一点需要提及:流时间只有在处理数据才会提前 - 如果您的应用程序到达输入主题的末尾并等待数据,punctuate则不会被调用.即使您使用,这也适用WallclockTimestampExtractor.

顺便说一句:目前有关于Streams标点符号行为的讨论:https://github.com/apache/kafka/pull/1689

回答Kafka 1.0及更高版本

从Kafka 1.0开始,可以根据挂钟时间或事件时间来注册标点符号:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2