Gli*_*ide 12 apache-kafka apache-kafka-streams
有没有办法用Kafka Stream手动提交?
通常使用KafkaConsumer,我做类似下面的事情:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
// process records
}
consumer.commitAsync();
}
Run Code Online (Sandbox Code Playgroud)
我手动调用提交的地方.我没有看到类似的API KStream.
Mat*_*Sax 27
提交由Streams内部和全自动处理,因此通常没有理由手动提交.请注意,Streams处理此方式与使用者自动提交不同 - 实际上,对内部使用的使用者禁用自动提交,而Streams"手动"管理提交.原因是,提交只能在处理期间的某些点发生,以确保没有数据丢失(关于更新状态和刷新结果存在许多内部依赖性).
对于更频繁的提交,您可以通过StreamsConfig参数减少提交间隔commit.interval.ms.
尽管如此,通过低级处理器API可以间接进行手动提交.您可以使用context通过init()方法提供的对象来调用context#commit().请注意,这只是"请求Streams"尽快提交 - 它不是直接发出提交.
| 归档时间: |
|
| 查看次数: |
7160 次 |
| 最近记录: |