jef*_*uan 9 apache-kafka kafka-consumer-api apache-kafka-streams
对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制是否提交.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka将多次重新发送此消息,直到问题得到解决.
但是如何控制在使用其更高级别的流DSL API时是否提交消息?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
Mat*_*Sax 14
你的陈述并不完全正确.你不能 "控制提交或不提交" - 至少不能直接(在处理器API和DSL中).您只能用于ProcessorContext#commit()请求其他提交.因此,在调用#commit()Streams尝试尽快提交后,但它不是立即提交.此外,即使您从未打过电话,Streams也会自动提交#commit().您可以通过Streams配置控制Streams提交间隔commit.interval.m(参见http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)
如果出现"问题",则取决于您如何回应问题的类型:
Processor#process()或KeyValueMapper#apply()之前,你可以成功处理当前的消息(注意,你可能会遇到超时,即异常,使用此策略 - 参见消费者配置heartbeat.interval.ms和0.10.1 session.timeout.ms [KIP-62])StateStore并稍后处理它们.但是,很难做到正确并且还打破了一些Streams假设(例如,处理顺序).不建议使用,如果使用,您必须非常仔细地考虑其含义如果存在未捕获的异常,StreamThread则会死亡并且不会发生任何提交(您可以注册异常处理程序以获得有关此通知:http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-在你的应用程序代码中流.如果你全部StreamThread死亡,你将需要创建一个新的实例KafkaStreams来重启你的应用程序.
在成功处理消息之前,您不能从用户代码返回,因为如果您返回,Streams会假定消息已成功处理(因此可能会提交相应的偏移量).关于项目符号(3),将记录放入特殊的StateStore以便以后处理被认为是"成功"处理的记录.
| 归档时间: |
|
| 查看次数: |
3225 次 |
| 最近记录: |