Laz*_* R. 2 java spring apache-kafka spring-kafka
总的来说,我对 Spring-Kafka/Kafka 还是有点陌生。我的问题相当简短。我有一个仅限消费者的应用程序,它不断地从 Kafka 读取、处理消息,并使用 Ack Listener 手动确认它们。我依赖于上游仅生产者的应用程序,其中它们负责将消息发送到 Kafka 主题以便我使用。我们最近实现了跨生产者和消费者的事务,但我想更多地了解故障点以及如何处理那些回滚的事务以免丢失?我读过,最好使用AfterRollbackProcessor而不是用于SeekToCurrentErrorHandlerkafka 容器工厂上的事务,并StatefulRetry设置为true。我使用事务的原因是为了在新版本中实现一次性 Kafka 语义,因为我们处理大量数据库持久性,并且由于数据库限制而无法承受重复事务。我想知道我是否@KafkaListener必须注释,@Transactional因为我在声明不应是这种情况之前读过一篇文章,但其他文章可能是这种情况,这就是我不确定的原因。我见过很多关于生产者和消费者应用程序的问题,但我还没有看到关于分别具有这些不同角色的单独应用程序的问题(即使最终可能是同一件事)。简而言之,我只是想知道将事务与 Kafka 合并时的最佳实践是什么,以及如何处理这种情况下的故障。
对于仅限消费者的应用程序来说,Kafka 事务是不必要的开销。事务仅在生成记录时才有用。
我使用事务是为了在新版本中实现一次性Kafka 语义,因为我们处理大量数据库持久性,并且由于数据库限制而无法承受重复事务。
当涉及其他技术时,无法保证“恰好一次” 。恰好一次仅适用于
read->process->write
Run Code Online (Sandbox Code Playgroud)
读写都是Kafka的场景。这是一个常见的误解。
此外,即使仅使用 kafka 读/处理/写,“仅一次”语义也仅适用于整个shebang。即,仅当写入成功时才会提交读取的偏移量。
该process步骤将获得至少一次语义,因此每当您在流程步骤中的其他地方编写时,您都需要重复数据删除逻辑,无论是否有 Kafka 写入步骤,并且(如果有 Kafka 写入)您只使用一次事务那里。
对于从 Kafka 读取数据并写入数据库而不写入 Kafka 的情况,@Transactional在侦听器上是正确的方法(使用重复数据删除逻辑来避免重复)。
对于您想要一次 Kafka 语义(读/处理/写)但又在处理步骤中写入数据库的情况,您可以ChainedKafkaTransactionManager在侦听器容器中使用 a ,以便数据库事务与 Kafka 事务同步(但仍然有对于数据库提交成功但 Kafka 事务失败的情况,有一个小窗口)。因此,即使如此,您仍然需要重复数据删除逻辑。在这种情况下,您不需要听众@Transactional。
编辑
仅限生产者有点不同;假设您要在事务中发布 10 条记录,您希望它们全部进入(提交)或退出(回滚)。那么你必须使用交易。
事务中生成的记录的使用者应该这样做isolation.level=read_committed,这样他们就不会看到未提交的写入(默认为read_uncommitted)。
如果您一次仅发布单个记录,并且不涉及其他事务资源,那么仅涉及 Kafka 时使用事务就没有什么意义。
但是,如果您从数据库或 JMS 等读取数据,然后写入 Kafka,您可能希望同步数据库和 Kafka 事务,但重复的概率仍然不为零;您如何处理取决于您提交事务的顺序。
通常,重复数据删除取决于应用程序。通常会使用应用程序数据中的某个键,因此,例如,SQL INSERT 语句的条件是数据库中尚不存在该键。
Kafka为每条记录提供了一个方便的唯一键,结合主题/分区/偏移量。您可以将它们与数据一起存储在数据库中以防止重复。
编辑2
SeekToCurrentErrorHandler(STCEH)通常在不使用事务时使用;当侦听器抛出异常时,错误处理程序会重置偏移量,以便在下一次轮询时重新获取记录。经过多次尝试后,我们放弃并调用“恢复器”,例如将DeadLetterPublishingRecoverer失败的记录写入另一个主题。
但是,它仍然可以用于交易。
错误处理程序在事务范围内(在回滚之前)调用,因此,如果它抛出异常(除非恢复程序“消耗”失败,否则它会抛出异常),事务仍然会回滚。如果恢复成功,事务将提交。
( AfterRollbackProcessorARP)是在恢复能力添加到STCEH之前添加的。它本质上与 STCEH 完全相同,但它在事务范围之外运行(在回滚之后)。
配置两者不会造成任何损害,因为如果 STCEH 已经执行了查找,则 ARP 将无事可做。
我仍然更喜欢使用带事务的 ARP 和不带事务的 STCEH - 如果只是为了获取日志消息的适当日志类别。可能还有其他的原因,我暂时想不起来。
请注意,现在 STCEH 和 ARP 都支持重试和退避,因此根本不需要配置侦听器级别的有状态重试。如果您想使用内存中重试而不导致往返代理以重新获取相同记录,则无状态重试可能仍然有用。
| 归档时间: |
|
| 查看次数: |
401 次 |
| 最近记录: |