RabbitMQ频道创建指南

sma*_*bit 10 c# messaging rabbitmq

我正在编写一个简单的类,我的应用程序将使用RabbitMQ发送和接收消息.我已经阅读了尽可能多的How-Tos,博客文章,白皮书以及关于RabbitMQ的喜欢.大多数示例显示使用包含在using块中的连接和通道,并且通过说您应该将它们实现为单例来抵销它.特别是关于频道,我看到评论说你不应该同时使用单个频道的多个线程.

我正在用c#编写我的库.它是在第一次实例化时连接静态连接的单例.

我想为频道做同样的事情,但我打算使用相同的库来允许发布/订阅多个交换/队列.发布和订阅都可以从多个线程完成.

最后我的问题是:我应该如何实现频道创作?每条消息?让每个消费者拥有一个独特的私人频道,发布商同步访问一个独特的频道吗?你抓住了我的漂移.请记住,我打算使用单个服务器,有几十个消费者/发布者,而不是更多.

谢谢!

Eug*_*sky 7

编辑(2016-1-26):频道不是线程安全的.有关该文件的文件在2015年4月5月期间发生了变化.新文本:

不得在线程之间共享通道实例.应用程序应该更喜欢每个线程使用一个Channel,而不是跨多个线程共享相同的Channel.虽然通道上的某些操作可以安全地同时调用,但有些操作并不会导致错误的帧交错.在线程之间共享通道也会干扰*Publisher Confirms.

从您的问题来看,您似乎没有预定义的固定数量的线程,这些线程主要发布/订阅RabbitMQ(在这种情况下,您可以考虑创建一个通道作为线程初始化的一部分,或使用a ThreadLocal<IModel>).

如果并发RabbitMQ操作很少或消息大小总是很小,那么您可以简单地放置lock(channel)所有RabbitMQ发布/子操作.如果您需要以交错的方式传输多个请求 - 这就是首先使用的通道 - 使用任意线程,您可能需要创建一个通道池,例如ConcurrentQueue<IModel>,您将未使用的通道排队并在您需要的时间内出列队列他们.通道创建的开销非常低,我从性能测试中感觉到,通道创建过程不涉及任何网络,即看起来在客户首次使用时会在RabbitMQ服务器中自动创建通道.


OLD(2016-1-26之前):Java和.net实现现在几乎已经过时的细节:

Re:通道和多个线程,由于它依赖于实现,这有点令人困惑.

Java实现:通道是线程安全的:

通道实例可安全地供多个线程使用.

但是:

在多个线程之间共享Channel时,无法正确处理确认

.net实现:通道不是线程安全的:

如果多个线程需要访问特定的IModel实例,则应用程序应该强制执行互斥.

IModel操作错误序列化的症状包括但不限于

•在线路上发送无效的帧序列

•抛出NotSupportedExceptions ...

因此,除了Robin的有用答案,无论是否是线程安全,它都适用于.net实现,你不能只是共享一个连接.

  • 您可以共享连接.您"无法"在Connection上共享IModel(Channel). (2认同)

Jeo*_*uan 6

对于 ASP.NET Core,您可以利用 ObjectPool。创建 IPooledObjectPolicy

    using Microsoft.Extensions.ObjectPool;  
    using Microsoft.Extensions.Options;  
    using RabbitMQ.Client;  

    public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>  
    {  
        private readonly RabbitOptions _options;  

        private readonly IConnection _connection;  

        public RabbitModelPooledObjectPolicy(IOptions<RabbitOptions> optionsAccs)  
        {  
            _options = optionsAccs.Value;  
            _connection = GetConnection();  
        }  

        private IConnection GetConnection()  
        {  
            var factory = new ConnectionFactory()  
            {  
                HostName = _options.HostName,  
                UserName = _options.UserName,  
                Password = _options.Password,  
                Port = _options.Port,  
                VirtualHost = _options.VHost,  
            };  

            return factory.CreateConnection();  
        }  

        public IModel Create()  
        {  
            return _connection.CreateModel();  
        }  

        public bool Return(IModel obj)  
        {  
            if (obj.IsOpen)  
            {  
                return true;  
            }  
            else  
            {  
                obj?.Dispose();  
                return false;  
            }  
        }  
    }  
Run Code Online (Sandbox Code Playgroud)

然后为ObjectPool配置依赖注入

services.AddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();
services.AddSingleton(s =>
{
   var provider = s.GetRequiredService<ObjectPoolProvider>();
   return provider.Create(new RabbitModelPooledObjectPolicy());
});
Run Code Online (Sandbox Code Playgroud)

然后您可以注入ObjectPool<IModel>并使用它

var channel = pool.Get();
try
{
    channel.BasicPublish(...);
}
finally
{
    pool.Return(channel);
}

Run Code Online (Sandbox Code Playgroud)

资料来源:

https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/

https://developpaper.com/detailed-explanation-of-object-pools-various-usages-in-net-core/


Rob*_*bin 2

我无法评论 C# 实现的细节,但了解 Amqp 通道被设计为共享单个 TCP 连接(即启用多路复用)可能会有所帮助。单个通道一次只能发送一条消息或接收一条消息,但一个连接可以同时接收不同通道上的消息。假设您有 2 个 1GB 的大文件,通过 Amqp 发送给单个消费者,消息可能会被分割成 10K 块并以交错方式发送。您可以在设置连接时操纵默认的 Amqp 消息大小,这与您是否以及何时可能遇到交错有关;据我所知,此功能旨在帮助防止当多个消费者共享连接并且一个消费者接收大消息时出现饥饿。

HTH。