在SinkTask中完成"put"后,立即使用Kafka Connect HOWTO"提交偏移量"

Cha*_*ntz 3 flush offset apache-kafka apache-kafka-connect

我正在使用Kafka Connect从Kafka Broker(v0.10.2)获取消息,然后将其同步到下游服务.

目前我有代码SinkTask#put处理,SinkRecord然后将其持久保存到下游服务.

几个关键要求,

  1. 我们需要确保消息持久保存到下游服务ATLEAST一次.
  2. 如果下游服务抛出错误或说它没有处理消息,那么我们需要确保再次重新读取消息.

因此,我们认为我们可以依赖于SinkTask#flush有效地退出为接收消息的特定轮询/周期提交偏移.通过抛出一个异常或者某个东西来告诉Connect不提交偏移量,但是在下次轮询中重试.

但正如我们发现的那样flush,实际上是基于时间的,并且或多或少地独立于民意调查之外,它会在达到特定时间阈值时提交抵消.

在0.10.2 SinkTask#preCommit中引入,所以我们认为我们可以将它用于我们的目的.但是在文档中没有提到SinkTask#put&之间存在1:1的关系SinkTask#preCommit.

因为基本上我们想commit offsets尽快一个put succeeds.同样地,如果特定失败,则不提交偏移量put.

如果没有通过,如何实现这一目标SinkTask#preCommit

Ran*_*uch 10

正确地将数据输入和输出Kafka可能具有挑战性,并且Kafka Connect使其变得更容易,因为它使用了最佳实践并隐藏了许多复杂性.对于接收器连接器,Kafka Connect从主题读取消息,将它们发送到连接器,然后定期提交已读取和处理的各个主题分区的最大偏移量.

请注意,"将它们发送到连接器"对应于该put(Collection<SinkRecord>)方法,并且在Kafka Connect提交偏移之前可能会多次调用此方法.您可以控制Kafka Connect提交偏移的频率,但Kafka Connect确保只有当连接器成功处理该消息时,它才会为消息提交偏移量.

当连接器名义上运行时,一切都很好,连接器会看到每条消息一次,即使定期提交偏移也是如此.但是,如果连接器发生故障,那么当它重新启动时,连接器将从最后提交的偏移量开始.这可能意味着您的连接器会看到它在崩溃之前处理的一些相同消息.如果您仔细编写连接器至少具有一次语义,这通常不是问题.

为什么Kafka Connect会定期提交偏移而不是每条记录?因为它节省了大量的工作,并且在名义上的事情发生时并不重要.只有当出现问题时,偏移滞后才会起作用.即便如此,如果你有Kafka Connect处理偏移量,你的连接器需要准备好至少处理一次消息.一次是可能的,但你的连接器必须做更多的工作(见下文).

写记录

在编写连接器时你有很大的灵活性,这很好,因为很多东西将取决于它所编写的外部系统的功能.让我们来看看如何实现不同的方式putflush.

如果系统支持事务或可以处理一批更新,则连接器put(Collection<SinkRecord>)可以使用单个事务/批处理写入该集合中的所有记录,根据需要重试多次,直到事务/批处理完成或最终抛出错误为止.在这种情况下,put所有工作都将成功或将失败.如果成功,那么Kafka Connect知道所有记录都已正确处理,因此可以(在某些时候)提交偏移量.如果您的put呼叫失败,则Kafka Connect假定不知道是否处理了任何记录,因此它不会更新其偏移量并停止连接器.您的连接器flush(...)无需任何操作,因为Kafka Connect正在处理所有偏移.

如果系统不支持事务,而您只能一次提交一个项目,则可能是连接器put(Collection<SinkRecord>)尝试单独写出每个记录,阻塞直到成功并在发出错误之前根据需要重试每个记录.同样,put所有的工作和flush方法可能都不需要做任何事情.

到目前为止,我的例子完成了所有的工作put.您总是可以选择put简单地缓冲记录,而是完成在或中写入外部服务的所有工作.你可能这样做的一个原因是你写的是基于时间的,就像和.如果您不希望您的写入是基于时间的,您可能不希望写入或写入.flushpreCommitflushpreCommitflushpreCommit

记录偏移或不记录

如上所述,默认情况下,Kafka Connect会定期记录偏移量,以便在重新启动时连接器可以从最后一次停止的位置开始.

然而,有时希望连接器记录外部系统中的偏移,特别是当可以原子方式完成时.当这样的连接器启动时,它可以查看外部系统以找出上次写入的偏移量,然后可以告诉Kafka Connect它想要开始读取的位置.使用这种方法,您的连接器可以完成一次消息处理.

当接收器连接器执行此操作时,它们实际上不需要Kafka Connect来提交任何偏移.该flush方法只是让您的连接器知道Kafka Connect为您提供哪些偏移的机会,并且由于它不返回任何内容,因此无法修改这些偏移或告诉Kafka Connect连接器正在处理哪些偏移.

这是preCommit方法的用武之地.它实际上是一个替代flush(它实际上需要相同的参数flush),除了它应该返回Kafka Connect应该提交的偏移量.默认情况下,preCommit只调用flush然后返回传递给preCommit它的相同偏移量,这意味着Kafka Connect应该提交它传递给连接器的所有偏移量preCommit.但是如果你preCommit返回一组空的偏移,那么Kafka Connect将根本不记录任何偏移.

因此,如果您的连接器将处理外部系统中的所有偏移并且不需要Kafka Connect记录任何内容,那么您应该覆盖该preCommit方法而不是flush,并返回一组空的偏移.