使用Kafka Streams DSL时如何处理错误并且不提交

jef*_*uan 9 apache-kafka kafka-consumer-api apache-kafka-streams

对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制是否提交.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka将多次重新发送此消息,直到问题得到解决.

但是如何控制在使用其更高级别的流DSL API时是否提交消息?

资源:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

Mat*_*Sax 14

你的陈述并不完全正确.你不能 "控制提交或不提交" - 至少不能直接(在处理器API和DSL中).您只能用于ProcessorContext#commit()请求其他提交.因此,在调用#commit()Streams尝试尽快提交后,但它不是立即提交.此外,即使您从未打过电话,Streams也会自动提交#commit().您可以通过Streams配置控制Streams提交间隔commit.interval.m(参见http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)

如果出现"问题",则取决于您如何回应问题的类型:

  • 如果你发现一个无法恢复的问题,你只能抛出异常并"停止世界"(参见下文).
  • 如果你有一个可恢复的错误,你需要在自己的代码中"循环"(例如,在问题解决之前Processor#process()KeyValueMapper#apply()之前,你可以成功处理当前的消息(注意,你可能会遇到超时,即异常,使用此策略 - 参见消费者配置heartbeat.interval.ms和0.10.1 session.timeout.ms [KIP-62])
  • 另一种方法是将现在无法处理的记录放入StateStore并稍后处理它们.但是,很难做到正确并且还打破了一些Streams假设(例如,处理顺序).不建议使用,如果使用,您必须非常仔细地考虑其含义

如果存在未捕获的异常,StreamThread则会死亡并且不会发生任何提交(您可以注册异常处理程序以获得有关此通知:http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-在你的应用程序代码中流.如果你全部StreamThread死亡,你将需要创建一个新的实例KafkaStreams来重启你的应用程序.

在成功处理消息之前,您不能从用户代码返回,因为如果您返回,Streams会假定消息已成功处理(因此可能会提交相应的偏移量).关于项目符号(3),将记录放入特殊的StateStore以便以后处理被认为是"成功"处理的记录.

  • 你可以将 `commit.internal.ms` 配置设置为 `Long.MAX_VALUE`——这将有效地避免 Kafka Streams 自动提交,但只有在你调用 `context#commit()` 之后。 (2认同)
  • 无论如何,KS都会默认将“ enable.auto.commit”设置为“ false”。但是,KS具有其自己的提交逻辑(即,它在每个`commit.interval.ms`内部“手动”调用Consumer#commit()):因此,从消费者的角度来看,我们使用手动提交,从Kafka Streams的角度来看,它仍然是自动的,并且没有Kafka Streams配置可禁用它。 (2认同)