如何使用Kafka Stream手动提交?

Gli*_*ide 12 apache-kafka apache-kafka-streams

有没有办法用Kafka Stream手动提交?

通常使用KafkaConsumer,我做类似下面的事情:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
       // process records
    }
   consumer.commitAsync();
}
Run Code Online (Sandbox Code Playgroud)

我手动调用提交的地方.我没有看到类似的API KStream.

Mat*_*Sax 27

提交由Streams内部和全自动处理,因此通常没有理由手动提交.请注意,Streams处理此方式与使用者自动提交不同 - 实际上,对内部使用的使用者禁用自动提交,而Streams"手动"管理提交.原因是,提交只能在处理期间的某些点发生,以确保没有数据丢失(关于更新状态和刷新结果存在许多内部依赖性).

对于更频繁的提交,您可以通过StreamsConfig参数减少提交间隔commit.interval.ms.

尽管如此,通过低级处理器API可以间接进行手动提交.您可以使用context通过init()方法提供的对象来调用context#commit().请注意,这只是"请求Streams"尽快提交 - 它不是直接发出提交.

  • 不在应用程序内。根据您的应用程序遇到问题的时间,您可以使用“DeserializationExceptionHandler”:https://docs.confluence.io/current/streams/developer-guide/config-streams.html#default-deserialization-exception-handler --或者您也许能够捕获异常并“吞掉”它。 (3认同)