为什么我在跳过的队列中收到消息

phi*_*hil 8 masstransit

我在 fork/join 配置中有一个 saga 设置。

在传奇中定义的事件

  • FileMetadataMsg
  • FileReadyMsg
  • SomeOtherMsg

当文件进入单独的侦听器时,进程开始。

  • 出版 SagaStart(correlationId)
  • 出版 FileSavedToMsg(correlationId, fileLoc)
  • 出版 FileMetadataMsg(correlationId, metadata)
  • 出版 FileReadyMsg(correlationId, fileLoc)

的下游端点对文件做了一些工作

Consumer<FileSavedToMsg>

  • 出版 SomeOtherMsg(GotTheFileMsg.correlationId, data)

我在 saga_skipped 队列中收到一个 FileSavedToMsg。我只能假设这是由于 FileSavedToMsg 上有一个correlationId,因为传奇本身没有在其状态机中使用 FileSavedToMsg 并且没有Event<FileSavedToMsg>.

如果这是为什么...我应该在 CorrelationId 以外的字段中传递correlationId,所以传奇看不到它?我在某个地方需要它,所以我可以用它标记 SomeOtherMsg。

这是 saga 端点的定义方式

return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });              

            cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
            {
                epCfg.StateMachineSaga(machine, repository);
            });
});
Run Code Online (Sandbox Code Playgroud)

以下是定义工作端点的方式

return Bus.Factory.CreateUsingRabbitMq(x =>
{
    var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
        {
            c.PrefetchCount = 1;
            c.Instance(_studyCreatedMsgConsumer);
        });
 });
Run Code Online (Sandbox Code Playgroud)

它们在同一台机器上运行,但在单独的控制台/Topshelf 应用程序中。

Chr*_*son 11

如果您在该接收端点上的消费者未使用的队列上获取消息,则可能是您之前正在使用该消息类型并将其从消费者(或 saga,在您的情况下)中删除,或者您正在使用来自其他目的的队列,它消耗了该消息类型。

无论哪种方式,如果您进入 RabbitMQ 管理控制台并查找队列,您可以展开 Bindings 人字形,单击转到同名的交换(这是标准的 MassTransit 约定),然后展开交换的绑定查看哪些消息类型(命名为 .NET 类型名称的交换)绑定到该交换。

如果您看到端点未消耗的一个,那就是罪魁祸首。您可以使用 UI 解除绑定,之后发布的消息将不再发送到队列。