如何使用Spring Cloud Stream Kafka和每服务数据库实现微服务事件驱动架构

cod*_*ent 6 apache-kafka microservices spring-cloud spring-cloud-stream spring-kafka

我正在尝试实现事件驱动的体系结构来处理分布式事务.每个服务都有自己的数据库,并使用Kafka发送消息以通知其他微服务有关操作.

一个例子:

 Order service -------> | Kafka |------->Payment Service
       |                                       |
Orders MariaDB DB                   Payment MariaDB Database
Run Code Online (Sandbox Code Playgroud)

订单收到订单请求.它必须将新订单存储在其数据库中并发布消息,以便付款服务意识到它必须为该项目收费:

私人订单业务订单业务;

@PostMapping
public Order createOrder(@RequestBody Order order){
    logger.debug("createOrder()");
    //a.- Save the order in the DB
    orderBusiness.createOrder(order);
    //b. Publish in the topic so that Payment Service charges for the item.
    try{
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }catch(Exception e){
        logger.error("{}", e);
    }
    return order;
}
Run Code Online (Sandbox Code Playgroud)

这些是我的疑惑:

  1. 步骤a.-(保存在订单DB中)和b.-(发布消息)应该在事务中以原子方式执行.我怎样才能做到这一点?
  2. 这与前一个相关:我发送消息:orderSource.output().send(MessageBuilder.withPayload(order).build()); 无论Kafka代理是否已关闭,此操作都是异步的,并且总是返回true.我如何知道该消息已经到达Kafka经纪人?

Sön*_*bau 9

步骤a.-(保存在订单DB中)和b.-(发布消息)应该在事务中以原子方式执行.我怎样才能做到这一点?

Kafka目前不支持事务(因此也没有回滚或提交),您需要同步这样的事情.简而言之:你不能做你想做的事.当KIP-98合并时,这将在近期未来发生变化,但这可能还需要一些时间.此外,即使在Kafka中进行事务处理,跨两个系统的原子事务也是一件非常困难的事情,随后的所有事情只会通过Kafka中的事务支持得到改善,它仍然无法完全解决您的问题.为此,您需要考虑在您的系统中实现某种形式的两阶段提交.

您可以通过配置生产者属性来稍微接近,但最终您必须为您的一个系统(MariaDB或Kafka)选择至少一次最多一次.

让我们从您在Kafka可以做的事情开始,确保传递信息,然后我们将深入了解整个流程的选项以及结果.

保证交货

您可以配置多少券商必须确认收到您的邮件,请求返回之前,你与参数的ack:此设置为所有你告诉经纪人要等到所有的副本有一个答案返回给您之前收到你的消息.这仍然不能100%保证您的消息不会丢失,因为它只是已经写入页面缓存而且存在理论情况,其中代理在持久存储到光盘之前失败,其中消息可能仍然丢失.但这是一个很好的保证,你会得到.您可以通过降低经纪人强制fsync到光盘(强调文本和/或flush.ms)的间隔来进一步降低数据丢失的风险,但请注意,这些值会带来严重的性能损失.

除了这些设置之外,您还需要等待Kafka生产者将请求的响应返回给您,并检查是否发生了异常.这与你问题的第二部分有关,所以我将进一步深入研究.如果响应是干净的,您可以确定您的数据到达Kafka并开始担心MariaDB.

到目前为止我们所涵盖的所有内容仅涉及如何确保Kafka收到您的消息,但您还需要将数据写入MariaDB,这也可能会失败,这将使您有必要回忆一下您可能已发送给Kafka的消息 - 这是你不能做到的.

所以基本上你需要选择一个系统,你可以更好地处理重复/缺失值(取决于你是否重新发送部分失败),这将影响你做事的顺序.

选项1

卡夫卡第一

在此选项中,您在MariaDB中初始化事务,然后将消息发送到Kafka,等待响应,如果发送成功,则在MariaDB中提交事务.如果向Kafka发送失败,您可以在MariaDB中回滚您的交易,一切都很花哨.但是,如果发送到Kafka成功并且由于某种原因您对MariaDB的提交失败,则无法从Kafka获取消息.因此,如果您稍后重新发送所有内容,您将在MariaDB中丢失消息或在Kafka中发送重复消息.

选项2

MariaDB首先

这几乎就是另一种方式,但您可能能够更好地删除使用MariaDB编写的消息,具体取决于您的数据模型.

当然,您可以通过跟踪失败的发送并稍后重试这两种方法来缓解这两种方法,但所有这些都是更大问题上的绑定.

我个人会采用方法1,因为提交失败的可能性应该小于发送本身,并在卡夫卡的另一边实施某种欺骗检查.


这与前一个相关:我发送消息:orderSource.output().send(MessageBuilder.withPayload(order).build()); 无论Kafka代理是否已关闭,此操作都是异步的,并且总是返回true.我如何知道该消息已经到达Kafka经纪人?

首先,我承认我不熟悉Spring,所以这可能对您没用,但下面的代码片段说明了检查异常的产生响应的一种方法.通过调用flush阻止,直到所有发送完成(并且失败或成功),然后检查结果.

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}
Run Code Online (Sandbox Code Playgroud)