使用kafka和jpa时的好习惯

zib*_*ibi 15 java jpa apache-kafka spring-boot

我目前正在使用JPA和Kafka的项目.我正在尝试找到一组合并这些操作的良好实践.

在现有代码中,生产者在与jpa相同的事务中使用,但是,根据我的阅读,似乎他们不共享事务.

@PostMapping
@Transactional
public XDto createX(@RequestBody XRequest request) {
    Xdto dto = xService.create(request);
    kafkaProducer.putToQueue(dto, Type.CREATE);
    return dto;
}
Run Code Online (Sandbox Code Playgroud)

其中kafka生产者的定义如下:

public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Type> template;

    public void putToQueue(Dto dto, Type eventType) {
        template.send("event", new Event(dto, eventType));
    }
}
Run Code Online (Sandbox Code Playgroud)

这是组合jpa和kafka的有效用例,是否正确定义了事务边界?

gag*_*ngh 7

当事务失败时,这不会按预期工作.kafka互动不是交易的一部分.

您可能想要查看TransactionalEventListener您可能希望在AFTER_COMMIT事件上将消息写入kafka.即使这样,kafka发布也可能失败.

另一个选择是使用jpa写入db.让debezium从您的数据库中读取更新的数据并将其推送到kafka.该活动将采用不同的格式,但更丰富.


pus*_*har 5

通过查看您的问题,我假设您正在尝试实现OLTP系统的CDC(更改数据捕获),即将所有更改记录到事务数据库中。有两种方法可以解决此问题。

  1. 应用程序代码对事务数据库和Kafka进行双重写入。它不一致并影响性能。这是不一致的,因为当您对两个独立的系统进行双重写入时,如果其中一个写入失败,则数据会被破坏,并且在事务流中将数据推送到Kafka会增加延迟,您不想妥协。
  2. 从数据库提交中提取更改(数据库/应用程序级触发器或事务日志),并将其发送到Kafka。这是非常一致的,完全不会影响您的交易。一致,因为数据库提交日志是成功提交之后数据库事务的反映。有很多它的杠杆作用可以解决这种做法像的数据总线麦克斯韦debezium等。

如果您使用CDC,请尝试使用任何可用的解决方案。