bit*_*ttu 6 apache-kafka kafka-consumer-api
在实施 kafka 消费者时应该采取什么更好的方法。
目标从 Kafka 读取并写回 db。数百万行
方法 1:每个分区 - 每个消费者 - 等待消息消耗(即写回数据库)然后在轮询循环中继续下一步。
方法 2:每个分区 - 每个消费者 - 将记录发送到工作线程或线程池以写回数据库,然后提交偏移量并继续轮询。需要注意抵消管理。在这种情况下,不要等待消息写回数据库。继续轮询,将消息传递给工作线程。
关于他们两个的任何见解?
谢谢
小智 7
方法一: 该方法仅适用于您可以估计消息处理时间的情况,否则不推荐使用。
问题:在这种方法中,主要问题是让消费者保持活动状态,如果您在再次调用 poll() 之前等待消息完全处理,则必须确保您的消费者在调用 poll() 之前应该处于活动状态,因为 kafka维护一个名为“session.timeout.ms”的属性。kafka broker/cluster 对这个属性的值采取行动,如果消费者在“session.timeout.ms”的时间段内无法再次调用 poll(),broker 将标记消费者死亡并将其踢出. 现在,当消费者完成消息处理并再次调用 poll() 时,它被视为新加入者,并将再次像以前一样从偏移量开始提供记录集。牢记这种情况,消费者将陷入无限循环,永远不会继续其偏移量。
可能的解决方案 1:要使用此方法,您需要具有以下副作用的以下属性“session.timeout.ms”的良好值:
1:值太低:消费者将被标记为死亡,如上所述,并且永远不会继续其偏移量,但是消息将被处理,但每次完成消息时,它将再次获得以前的消息+新消息。
2:值太高:Broker 检测消费者的真正故障会很晚,这将导致记录重复并影响整体吞吐量。
可能的解决方案 2:(仅对版本 0.10.1.x 有效)Kafka 在发布 (0.10.1.0) 中的官方修复。在这种方法中,引入了两个值得注意的实体:一个新属性“max.poll.interval.ms”,它设置了客户端调用 poll() 之间的最大延迟,以及一个负责使消费者保持活动状态的后台线程。因此,在某个场景中,当消费者调用方法 poll() 然后忙于消息处理时,内部后台线程将使心跳保持活动状态,因此消费者将保持活动状态。但是,这个内部后台线程本身将保持活动状态,直到属性“max.poll.interval.ms”的超时值保持有效。因此,该线程将等待消费者在“max.poll.interval.ms”的时间段内调用 poll() ,否则,它将发送离开请求并自行死亡。”
该解决方案中的棘手部分再次是找到此属性的合适值:“max.poll.interval.ms”(非常重要,这将是后台线程将保持心跳活动而无需显式调用 poll())。
方法 2:使用工作线程是一个好主意,但是您必须维护一个内部队列或对接收到的消息进行验证,这可能很复杂,而且您还需要针对自动提交使用手动提交。有关提交的更多信息,请参阅此内容并搜索标题“提交和偏移量”。
问题:在这种方法中,主要问题是跟踪收到的消息和成功处理的消息。因为,您的消费者将收到消息,它将消息传递给相应的工作线程,并将提交偏移量并继续接收更多消息。在此过程中,您必须注意以下问题:
解决方案:可以有不同的方法来解决上述问题,一种方法是使用内部队列来保留消息和手动提交,只有当工作线程报告消息处理成功时才会发送。然而,需要非常仔细的实现,因为它可能导致复杂的代码,也可能导致内存管理或线程问题。
建议:根据您的要求,您可以使用一种方法或另一种方法来解决上述可能的问题。但是我会推荐一个更强大的解决方案是使用分区暂停/恢复。以非常抽象的方式,您的消费者应该执行以下步骤:
1:poll() 用于消息。
2:暂停所有相应的主题/分区。
3:将消息分配给工作线程并等待它们的处理。
4:继续调用 poll() 但当分区暂停时,不会收到额外的消息,而消费者将保持活动状态。(确保此时没有注册新主题)
5:如果所有工作线程都应报告消息处理成功/失败,则相应地提交偏移量。
6:恢复所有分区。
注意:根据您的场景和要求,可能有更好的方法或其他可能的解决方案。这只是一个想法或可能的解决方案之一。