使用 RabbitMQ 的 Masstransit 使用 .net 为给定类型创建 2 个交换

Kar*_*hik 6 masstransit

下面是在 ASP.net Core 6.0 API 中开发的示例 POC,它使用 MassTransit 和 RabbitMQ 来模拟使用 MassTransit 消费者的简单发布/订阅。然而,当执行代码时,会导致在 RabbitMQ 中创建 2 个交换器和 1 个队列。

程序.cs

builder.Services.AddMassTransit(msConfig =>
{
    msConfig.AddConsumers(Assembly.GetEntryAssembly());

    msConfig.UsingRabbitMq((hostcontext, cfg) =>
    {
        cfg.Host("localhost", 5700, "/", h =>
        {

           h.Username("XXXXXXXXXXX");
           h.Password("XXXXXXXXXXX");

        });

        cfg.ConfigureEndpoints(hostcontext);
    });
});
Run Code Online (Sandbox Code Playgroud)

订单消费者.cs

public class OrderConsumer : IConsumer<OrderDetails>
{
    readonly ILogger<OrderConsumer> _logger;
    public OrderConsumer(ILogger<OrderConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<OrderDetails> context)
    {
        _logger.LogInformation("Message picked by OrderConsumer. OrderId : {OrderId}", context.Message.OrderId);

        return Task.CompletedTask;
    }
}
Run Code Online (Sandbox Code Playgroud)

模型

    public class OrderDetails
{
    public int OrderId { get; set; }
    public string OrderName { get; set; }
    public int Quantity { get; set; }
}
Run Code Online (Sandbox Code Playgroud)

控制器

    readonly IPublishEndpoint _publishEndpoint;

    [HttpPost("PostOrder")]
    public async Task<ActionResult> PostOrder(OrderDetails orderDetails)
    {
        await _publishEndpoint.Publish<OrderDetails>(orderDetails);

        return Ok();
    }
Run Code Online (Sandbox Code Playgroud)

来自 Asp.Net 的输出 来自 Asp.Net 的输出

正如突出显示的,创建了 2 个交换示例:OrderDetails 和 Order。

在此输入图像描述

但是,Sample:OrderDetails 绑定到 Order (Exchange)

在此输入图像描述

并且订单(交换)路由到“订单”队列。

在此输入图像描述

在此输入图像描述

所以,问题是关于创建的 2 个交换,我不确定这是根据设计还是代码上的错误导致创建的,如果是根据设计,为什么需要 2 个交换。

小智 7

当我第一次开始使用 MassTransit 时,我也在思考同样的问题,最终明白如下:您正在通过 MassTransit 路由两种类型的消息:事件和命令。事件被多播给潜在的多个消费者,命令被多播给单个消费者。每个消费者都有自己的输入队列,消息通过交换器路由到该队列。

对于每种消息类型,MassTransit 默认情况下会根据消息类型创建一个扇出交换器,并为该消息的每个使用者创建一个扇出交换器和一个队列。

这对于 events 来说绝对有意义,因为您正在使用事件类型发布事件(不知道谁或是否有人会使用它),所以在您的情况下,您发布到交换OrderDetails。MassTransit 必须确保此事件的所有消费者都绑定到此交换。在这种情况下,您有一个消费者,OrderConsumer。默认情况下,MassTransit 根据该消费者的类型名称生成消费者交换的名称,并删除后缀Consumer。该消费者的实际输入队列绑定到该交换器。所以你会得到这样的东西:

EventTypeExchange => ConsumerExchange => ConsumerQueue

或者在你的情况下:

Sample:OrderDetails(基于类型Sample.OrderDetails)=> Order(基于类型OrderConsumer)=> Order(再次基于OrderConsumer类型)

对于命令来说,这一点不太明显,因为一条命令只能由一个消费者使用。事实上,您实际上可以告诉 MassTransit 不要根据命令类型创建交换。但是,您接下来要做的不是基于命令类型,而是基于命令处理程序类型来路由命令,这实际上不是一个好方法,因为现在您必须知道 - 发送命令时 - 类型名称是什么处理程序是。这会引入您真正不想要的耦合。因此,我认为最好根据命令类型保留交换并根据命令类型路由到它们。

正如 Chriss(MassTransit 的作者)在MassTransit RabbitMQ 深入视频 (YouTube)中提到的那样,此设置还允许您做一些有趣的事情,例如将消息虹吸到另一个队列以进行监视/审核/调试,只需创建一个新队列并将其绑定到现有的扇出交换。

以上所有内容都是基于我对框架的使用,因此我可能犯了一些错误,但至少对我来说确实有意义。RabbitMQ 的路由选项非常灵活,因此 Chriss 可以选择一种不同的方法(例如 Brighter,一个“竞争”库以不同的方式使用 RabbitMQ 来实现相同的结果),但这个方法也有优点。MassTransit 与 NServiceBus 或 Brighter 等其他框架不同,在技术上并不真正区分或关心这两者之间的语义差异,例如,您可以像发送或发布事件一样发送或发布命令。