我可以在大众运输中的端点配置中使用 UsePartitioner 来对传递到同一队列的多种消息类型进行分区吗

Mar*_*per 4 masstransit rabbitmq

我正在使用大众运输和rabbitmq。我的队列有各种类型的消息传递给它。这些类型中的大多数都实现了 IHaveOrganisationKey。我想向管道添加一个分区程序,以确保同时处理具有给定组织密钥的一条消息(任何类型的)。目标是限制并行处理同一组织的多个消息时出现的并发问题,同时允许并行处理来自不同组织的消息。

配置代码

sbc.ReceiveEndpoint(host, this.queueConfiguration.QueueName, ep =>
{
    ep.PrefetchCount = this.busConfiguration.PrefetchCount;
    this.queueConfiguration.ConfigureEndpoint(ep);
    this.queueConfiguration.SubscribeMessages(worker, ep);
});
Run Code Online (Sandbox Code Playgroud)

在队列配置中:

public override void ConfigureEndpoint(IRabbitMqReceiveEndpointConfigurator ep)
{
    // This is incomplete. Am I on the right track here?
    ep.UsePartitioner(1, x => x.TryGetMessage<IHaveOrganisationKey>());
}
Run Code Online (Sandbox Code Playgroud)

Chr*_*son 5

在这种情况下,您需要单独创建分区器,然后将其传递给每个消息类型配置。

var p = ep.CreatePartitioner(8);

ep.Consumer<ConsumerA>(x => x.Message<A>(m => m.UsePartitioner(p, c => c.Message.OrgKey)));
ep.Consumer<ConsumerB>(x => x.Message<B>(m => m.UsePartitioner(p, c => c.Message.OrgKey)));
Run Code Online (Sandbox Code Playgroud)

如果你的消费者有多种消息类型,你可以Message在配置中指定多条语句。

  • 干杯,克里斯!起初,我假设传递给 CreatePartitioner 的 partitionCount 参数指的是每个分区的消息数。我现在明白这是一个全局并发限制,并且单个分区的消息数始终为一个。 (2认同)