Eik*_*nds 7 spring-transactions spring-kafka
我想将kafka事务与存储库事务同步:
@Transactional
public void syncTransaction(){
myRepository.save(someObject)
kafkaTemplate.send(someEvent)
}
Run Code Online (Sandbox Code Playgroud)
由于合并(https://github.com/spring-projects/spring-kafka/issues/373)和根据文档,这是可能的。但是,我在理解和实现该功能时遇到了问题。查看https://docs.spring.io/spring-kafka/reference/htmlsingle/#_transaction_synchronization中的示例,我必须创建一个MessageListenerContainer来侦听我自己的事件。我还需要使用KafkaTemplate发送事件吗?MessageListenerContainer是否禁止发送到代理?
如果我正确理解kafkaTemplate和kafkaTransactionManager必须使用同一生产者工厂,则必须在其中启用Transaction设置transactionIdPrefix。在我的示例中,我必须将messageListenerContainer的TransactionManager设置为DataSourceTransactionManager。那是对的吗?
从我的角度来看,我通过kafkaTemplate发送事件,听我自己的事件并再次使用kafkaTemplate转发事件看起来很奇怪。
如果我能得到一个简单的示例,说明如何将kafka事务与存储库事务进行同步,并提供解释,我将为我提供帮助。
如果侦听器容器配备了KafkaTransactionManager,则容器将创建一个生产者,该生产者将由任何下游kafka模板使用,并且该容器会将偏移量发送给您。
如果容器具有其他事务管理器,则该容器将无法发送偏移量,因为它无权访问生产者(或模板)。
另一个解决方案是使用@Transactional(使用数据源TM)注释您的方法,并使用kafka TM配置容器。
这样,您的DB tx将在线程返回到容器之前提交,然后容器将偏移量发送到kafka事务并提交。
有关示例,请参见框架测试用例。
@Eike Behrends 有一个 db + kafka 事务,你可以这样使用ChainedTransactionManager和定义它:
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());;
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
return new JpaTransactionManager(em);
}
@Bean(name = "chainedTransactionManager")
public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
KafkaTransactionManager kafkaTransactionManager) {
return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}
Run Code Online (Sandbox Code Playgroud)
您需要注释您的事务性 db+kafka 方法 @Transactional("chainedTransactionManager")
(你可以在 spring-kafka 项目上看到这个问题:https : //github.com/spring-projects/spring-kafka/issues/433)
你说 :
从我的角度来看,我通过 kafkaTemplate 发送事件,监听我自己的事件并再次使用 kafkaTemplate 转发事件看起来很奇怪。
你试过这个吗?如果是这样,你能提供一个例子吗?