Spring Kafka中的事务同步

Eik*_*nds 7 spring-transactions spring-kafka

我想将kafka事务与存储库事务同步:

@Transactional
public void syncTransaction(){
  myRepository.save(someObject)
  kafkaTemplate.send(someEvent)
}
Run Code Online (Sandbox Code Playgroud)

由于合并(https://github.com/spring-projects/spring-kafka/issues/373)和根据文档,这是可能的。但是,我在理解和实现该功能时遇到了问题。查看https://docs.spring.io/spring-kafka/reference/htmlsingle/#_transaction_synchronization中的示例,我必须创建一个MessageListenerContainer来侦听我自己的事件。我还需要使用KafkaTemplate发送事件吗?MessageListenerContainer是否禁止发送到代理?

如果我正确理解kafkaTemplate和kafkaTransactionManager必须使用同一生产者工厂,则必须在其中启用Transaction设置transactionIdPrefix。在我的示例中,我必须将messageListenerContainer的TransactionManager设置为DataSourceTransactionManager。那是对的吗?

从我的角度来看,我通过kafkaTemplate发送事件,听我自己的事件并再次使用kafkaTemplate转发事件看起来很奇怪。

如果我能得到一个简单的示例,说明如何将kafka事务与存储库事务进行同步,并提供解释,我将为我提供帮助。

Gar*_*ell 8

如果侦听器容器配备了KafkaTransactionManager,则容器将创建一个生产者,该生产者将由任何下游kafka模板使用,并且该容器会将偏移量发送给您。

如果容器具有其他事务管理器,则该容器将无法发送偏移量,因为它无权访问生产者(或模板)。

另一个解决方案是使用@Transactional(使用数据源TM)注释您的方法,并使用kafka TM配置容器。

这样,您的DB tx将在线程返回到容器之前提交,然后容器将偏移量发送到kafka事务并提交。

有关示例,请参见框架测试用例

  • “这样,您的 DB tx 将在线程返回到容器之前提交,然后容器会将偏移量发送到 kafka 事务并提交它。” => 但是 DB 提交和 Kafka 提交真的是一个原子操作吗?听起来像是两笔交易相继执行,但不像一笔交易。 (2认同)
  • 是的,但事务同步也是如此——它在 [Dr. Dave Syer 出色的 Javaworld 文章“Spring 中的分布式事务,有和没有 XA”](https://www.javaworld.com/article/2077963/open-source-tools/distributed-transactions-in-spring--with-and -没有-xa.html)。Kafka 不支持 XA,您必须处理 DB tx 可能会在 Kafka tx 回滚时提交的可能性。 (2认同)

nad*_*r.h 7

@Eike Behrends 有一个 db + kafka 事务,你可以这样使用ChainedTransactionManager和定义它:

@Bean
public KafkaTransactionManager kafkaTransactionManager() {
    KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());;
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
}


@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
    return new JpaTransactionManager(em);
}

@Bean(name = "chainedTransactionManager")
public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
                                                           KafkaTransactionManager kafkaTransactionManager) {
    return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}
Run Code Online (Sandbox Code Playgroud)

您需要注释您的事务性 db+kafka 方法 @Transactional("chainedTransactionManager")

(你可以在 spring-kafka 项目上看到这个问题:https : //github.com/spring-projects/spring-kafka/issues/433

你说 :

从我的角度来看,我通过 kafkaTemplate 发送事件,监听我自己的事件并再次使用 kafkaTemplate 转发事件看起来很奇怪。

你试过这个吗?如果是这样,你能提供一个例子吗?