use*_*547 3 apache-kafka apache-kafka-connect
我正在尝试使用Kafka Sink Connector将数据批量发送到NOSQL数据库。我正在关注https://kafka.apache.org/documentation/#connect文档,并对必须在何处实现发送记录的逻辑感到困惑。请帮助我了解如何在内部处理记录以及必须使用Put()或Flush()批量处理记录的内容。
当Kafka Connect工作者运行接收器任务时,它将使用分配给该任务的主题分区中的消息。这样做时,它将通过该put(Collection<SinkRecord>)方法反复将一批消息传递到接收器任务。只要连接器及其任务正在运行,此操作就会继续。
Kafka Connect还将定期记录接收器任务的进度,即每个主题分区上最近处理的消息的偏移量。这称为提交偏移量,这样做是为了确保连接器意外停止且异常干净,Kafka Connect知道任务应在每个主题分区中的何处恢复处理消息。但是就在Kafka Connect将偏移量写入Kafka之前,Kafka Connect工作者为接收器连接器提供了通过此flush(...)方法在此阶段进行工作的机会。
特定的接收器连接器可能不需要执行任何操作(如果put(...)完成了所有工作),或者它可能借此机会将已经处理过的所有消息提交put(...)到数据存储。例如,Confluent的JDBC接收器连接器put(...)使用事务写入通过该方法传递的每批消息(其大小可以通过连接器的使用者设置来控制),因此该flush(...)方法不需要执行任何操作。另一方面,Confluent的ElasticSearch接收器连接器仅累积一系列put(...)方法的所有消息,并仅在期间将它们写入Elasticsearch flush(...)。
源连接器和接收器连接器提交偏移的频率由连接器的offset.flush.interval.ms配置属性。默认值是每60秒提交一次偏移,这不足以提高性能并减少开销,但是足够频繁以限制连接器任务意外终止时潜在的重新处理量。请注意,当连接器正常关闭或遇到异常时,Kafka Connect将始终有机会提交偏移量。只有当Kafka Connect工人意外死亡时,它才可能没有机会提交用于标识已处理哪些消息的偏移量。因此,只有在发生此类故障之后重新启动之后,连接器才可能重新处理它在故障之前所做的某些消息。这是因为消息应该至少一次出现,因此消息应该是幂等的。把这一切加上你的连接器的行为考虑此设置确定适当的值时。
有关更多示例和详细信息,请参阅Kafka Connect的Confluent文档以及开源接收器连接器。
| 归档时间: |
|
| 查看次数: |
1746 次 |
| 最近记录: |