ASP.NET Web Api的事件发布者

Rob*_*ert 4 c# asp.net domain-driven-design asp.net-web-api microservices

我已经开始使用微服务了,我需要创建一个事件发布机制.

我计划使用Amazon SQS.

这个想法很简单.我将事件存储在与聚合相同的事务中的数据库中.如果用户将更改其电子邮件,则事件UserChangedEmail将存储在数据库中.

我也有事件处理程序,例如UserChangedEmailHandler,(在这种情况下)将负责将此事件发布到SQS队列,因此其他服务可以知道用户更改了电子邮件.

我的问题是,实现这一目标的做法是什么?我是否应该有某种后台定时进程来扫描事件表并将事件发布到SQS?这可以是WebApi应用程序中的流程(首选),还是应该是一个单独的流程?

其中一个想法是使用Hangfire,但它不支持一分钟内的cron作业.

有什么建议?

编辑:

正如其中一个答案中所建议的,我已经查看了NServicebus.NServiceBus页面上的一个示例显示了我关注的核心.

在他们的示例中,他们创建了一个已放置订单的日志.如果成功提交日志或数据库条目,但发布中断和事件永远不会发布怎么办?

这是事件处理程序的代码:

public class PlaceOrderHandler :
    IHandleMessages<PlaceOrder>
{
    static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
    IBus bus;

    public PlaceOrderHandler(IBus bus)
    {
        this.bus = bus;
    }

    public void Handle(PlaceOrder message)
    {
        log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
        log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");

        var orderPlaced = new OrderPlaced
        {
            OrderId = message.Id
        };
        bus.Publish(orderPlaced); <!-- my concern
    }
}
Run Code Online (Sandbox Code Playgroud)

Chr*_*mon 9

关于货架建议

我推荐现成的产品,而不是自己推销产品,因为这里有很多复杂性,一开始就不会显而易见,例如:

  • 管理事件订阅者列表 - SQS队列更适合与事件使用者配对,而不是与事件生产者配对,因为消息消耗时它不再可用于队列 - 因此,如果您想为给定事件支持多个订阅者(这是事件驱动架构的巨大好处),您如何知道在首次引发事件消息时将哪些SQS队列推送到哪个队列?
  • 重试语义,错误转发队列 - 处理由于临时基础结构问题导致的临时错误与由于业务逻辑语义问题导致的永久性错误
  • 在何处发送消息并在何处发送消息的审计跟踪
  • 通过SQS发送的消息的安全性(您的业务案例是否需要加密?SQS是亚马逊提供的应用服务,不提供存储级加密
  • 消息大小 - SQS具有消息大小限制,因此您最终可能需要处理大消息的带外传输

而这仅仅是我的头脑......

一些现成的系统可以提供帮助:

  • NServiceBus提供了一个管理命令和事件消息传递的框架,它有一个允许灵活传输类型的插件框架 - NServiceBus.SQS提供SQS作为传输.
    • 提供全面而灵活的重试,审计和错误处理
    • 意见使用命令与事件(命令消息说"执行此操作"并发送到单个服务进行处理,事件消息说"发生了事情"并发送给任意数量的灵活订阅者)
    • 发件箱模式提供事务一致的消息传递,即使是非事务一致的传输,例如SQS
    • 目前,SQS插件使用默认的NServiceBus订阅者持久性,这需要SQL Server来存储事件订阅者列表(有关利用SNS的选项,请参阅下文)
    • 内置对sagas的支持,提供一个框架,通过补偿操作确保多事务最终一致性和回滚
    • 超时支持预定的消息处理
    • 商业产品,所以不是免费的,但许多插件/扩展是开源的
  • 轨道交通
    • 不支持现成的SQS,但支持Azure Service Bus和RabbitMq,如果可以选择,可以替代它
    • 与NServiceBus类似的产品,但不是100%相同 - NServiceBus与MassTransit提供全面的比较
    • 完全开源/免费
  • 只是说
    • 轻量级开源消息传递框架,专为基于SQS/SNS而设计
    • 每个事件的SNS主题,每个微服务的SQS队列,使用本机SNS SQS队列描述来实现扇出
    • 开源免费

可能还有其他人,我对NServiceBus有很多个人经验,但我强烈建议您研究现成的解决方案 - 他们会让您开始根据业务事件开始设计您的系统,而不是担心事件传播.

即使您确实希望将自己构建为学习练习,也可以查看上述工作如何为您提供可靠的事件驱动消息传递所需的一些提示.

交易一致性和发件箱模式

已编辑该问题以询问如果操作的某些部分成功,但发布操作失败会发生什么.我已经看到这被称为消息传递的事务一致性,它通常意味着在事务中,所有业务副作用都被提交,或者没有.商业副作用可能意味着:

  • 数据库记录已更新
  • 另一个数据库记录已删
  • 消息发布到消息队列
  • 邮件已发送

如果数据库操作失败,通常不希望发送电子邮件或发布消息,同样,如果消息发布失败,则不希望提交数据库操作.

那么如何确保消息的一致性呢?

NServiceBus通过以下两种方式之一处理此问题:

  1. 使用事务一致的消息传输,例如MSMQ.
    1. MSMQ能够使用Microsoft的DTC(分布式事务协调器),DTC可以在SQL服务器更新的分布式事务中注册消息的发布 - 这意味着如果您的业务事务失败,您的发布操作将被回滚,反之亦然
  2. 发件箱模式
    1. 使用发件箱模式,不立即调度消息 - 它们被添加到数据库中的发件箱表中,理想情况下是与业务数据相同的数据库,作为同一事务的一部分
    2. 提交事务后,它会尝试分派每条消息,并仅在成功分派时将其从发件箱中删除
    3. 如果在发送之后但在删除之前系统发生故障,则将再次发送该消息.为了弥补这一点,当启用发件箱时,NServiceBus还将通过维护所有入站消息的记录并丢弃重复项来对入站消息进行重复数据删除.
    4. 重复数据删除对于Amazon SQS特别有用,因为它本身最终是一致的,并且可能会收到两次相同的消息.
    5. 这与您问题中的原始概念相差不远,但存在差异:
      1. 您正在构思一个后台定时进程来扫描事件表(也就是发件箱表)并将事件发布到SQS
      2. NServiceBus在管道中执行处理程序- 使用Outbox,向传输发送消息(也就是将消息推送到SQS队列)只是管道中的最后一步.因此 - 无论何时处理消息,处理期间生成的任何出站消息都将在业务事务提交后立即调度 - 无需对事件表进行定时扫描.
    6. 注意:只有在存在环境NServiceBus处理程序事务时 - 即在NServiceBus管道中处理消息时,发件箱才会成功.在某些情况下不会出现这种情况,例如WebAPI请求管道.因此,NServiceBus建议使用您的API请求仅发送单个Command消息,然后将业务数据操作与后端端点服务中事务一致的命令处理程序中的进一步消息传递相结合.虽然他们的文档中的第3点与MSMQ比SQS传输更相关.

处理程序语义

关于您的提案的另一个评论 - 按照惯例,UserChangedEmailHandler更常见的是与响应电子邮件更改而做某事的服务相关联,而不是简单地参与电子邮件已更改的信息的传播.当您的系统发布了50个事件时,您是否希望50个不同的处理程序将这些消息推送到不同的队列?

上述系统使用通用框架通过传输传播消息,因此您可以UserChangedEmailHandler为订阅系统保留,并在其中包含用户更改其电子邮件时应发生的业务逻辑.