我正在使用大众运输和rabbitmq。我的队列有各种类型的消息传递给它。这些类型中的大多数都实现了 IHaveOrganisationKey。我想向管道添加一个分区程序,以确保同时处理具有给定组织密钥的一条消息(任何类型的)。目标是限制并行处理同一组织的多个消息时出现的并发问题,同时允许并行处理来自不同组织的消息。
配置代码
sbc.ReceiveEndpoint(host, this.queueConfiguration.QueueName, ep =>
{
ep.PrefetchCount = this.busConfiguration.PrefetchCount;
this.queueConfiguration.ConfigureEndpoint(ep);
this.queueConfiguration.SubscribeMessages(worker, ep);
});
Run Code Online (Sandbox Code Playgroud)
在队列配置中:
public override void ConfigureEndpoint(IRabbitMqReceiveEndpointConfigurator ep)
{
// This is incomplete. Am I on the right track here?
ep.UsePartitioner(1, x => x.TryGetMessage<IHaveOrganisationKey>());
}
Run Code Online (Sandbox Code Playgroud)