eth*_*nny 7 java apache-kafka apache-kafka-streams
是否可以使用Kafka Streams应用程序运行主题中的所有数据然后退出?
示例我根据日期生成主题数据.消费者被cron开除,运行所有可用数据,然后......做什么?我不希望它等待更多数据.只是假设它就在那里,然后优雅地退出.
可能?
在Kafka Streams中(和其他流处理解决方案一样),它不是"数据结束",因为它首先是流处理 - 而不是批处理.
不过,您可以观察Kafka Streams应用程序的"滞后",如果没有滞后(滞后,是尚未消耗的消息数),请将其关闭.
例如,您可以使用bin/kafka-consumer-groups.sh检查Streams应用程序的延迟(应用程序ID用作使用者组ID).如果要将其嵌入Streams应用程序中,可以使用kafka.admin.AdminClient获取消费者组信息.
您可以创建一个consumer,然后一旦它停止提取数据,您就可以调用consumer.close(). 或者,如果您想将来再次进行投票,只需稍后consumer.pause()再致电即可.resume。
实现此目的的一种方法是在消费者轮询块内。例如
data = consumer.poll()
if (!data.next()) {
consumer.close()
}
Run Code Online (Sandbox Code Playgroud)
请记住poll返回ConsumerRecord<K,V>并符合接口Iterable。
| 归档时间: |
|
| 查看次数: |
2072 次 |
| 最近记录: |