通道:多个消费者是否可以从单个生产者广播/接收相同的消息?

bom*_*ina 4 c# producer-consumer publish-subscribe .net-core system.threading.channels

多个消费者是否可以接收同一条消息。我有一个生产者,它从网络套接字生成刻度数据(股票市场)。我现在有一个消费者每秒收到 1000 条消息,效果很好。但现在我想让多个消费者使用System.Threading.Channels. 单个生产者/消费者的完整工作代码。

class ConsumerOne
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly string _tag;

    public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
        _tag = tag;
    }

    public async Task StartConsuming()
    {
        await foreach (var message in _tickQueue.Reader.ReadAllAsync(
                           cancellationToken: _cancellationTokenSource.Token))
        {
            // Business logic of One
            Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
        }
    }
}

public class DummyData
{
    public long Ticks { get; set; }
    public DateTime DateTime { get; set; }
    public decimal Price { get; set; }
}

class Producer
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;

    public Producer(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
    }

    public async Task StartProducing()
    {
        Random r = new Random();
        for (int i = 0; i < 10; i++)
        {

            await _tickQueue.Writer.WriteAsync(new DummyData()
            {
                DateTime = DateTime.Now,
                Ticks = DateTime.Now.Ticks,
                Price = Convert.ToDecimal(r.NextDouble() * r.Next(100, 105))
            }, _cancellationTokenSource.Token);
            await Task.Delay(r.Next(50, 500));
        }
    }
}

internal class MultipleConsumersEg
{
    private static Channel<DummyData> tickQueue;
    private static readonly CancellationTokenSource TickQueueCancellationTokenSource = new CancellationTokenSource();
    public static async Task Main(string[] args)
    {
        tickQueue = Channel.CreateUnbounded<DummyData>();

        Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
        ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
         consumerOne.StartConsuming();

         p.StartProducing();

        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

上面的代码片段适用于单个生产者/消费者,小提琴链接。现在我想要另一个消费者用于不同的策略(每个消费者用于一个策略)。

class ConsumerTwo
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly string _tag;

    public ConsumerTwo(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
        _tag = tag;
    }

    public async Task StartConsuming()
    {
        await foreach (var message in _tickQueue.Reader.ReadAllAsync(
                           cancellationToken: _cancellationTokenSource.Token))
        {
            // Business logic of Two
            Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
        }
    }
}

    public static async Task Main(string[] args)
    {
        tickQueue = Channel.CreateUnbounded<DummyData>();

        Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
        ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
         consumerOne.StartConsuming();

        ConsumerTwo consumerTwo = new ConsumerTwo(tickQueue, TickQueueCancellationTokenSource, "TWO");
         consumerTwo.StartConsuming();

         p.StartProducing();

        Console.ReadLine();
    }
Run Code Online (Sandbox Code Playgroud)

添加第二个消费者后,它会消费数据,但两个消费者看不到相同的数据。这里我希望所有消费者都能收到全部 10 条消息。考虑到我将来最多可能有 50 个消费者,所有人都应该收到相同的消息。

输出:

class ConsumerOne
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly string _tag;

    public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
        _tag = tag;
    }

    public async Task StartConsuming()
    {
        await foreach (var message in _tickQueue.Reader.ReadAllAsync(
                           cancellationToken: _cancellationTokenSource.Token))
        {
            // Business logic of One
            Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
        }
    }
}

public class DummyData
{
    public long Ticks { get; set; }
    public DateTime DateTime { get; set; }
    public decimal Price { get; set; }
}

class Producer
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;

    public Producer(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
    }

    public async Task StartProducing()
    {
        Random r = new Random();
        for (int i = 0; i < 10; i++)
        {

            await _tickQueue.Writer.WriteAsync(new DummyData()
            {
                DateTime = DateTime.Now,
                Ticks = DateTime.Now.Ticks,
                Price = Convert.ToDecimal(r.NextDouble() * r.Next(100, 105))
            }, _cancellationTokenSource.Token);
            await Task.Delay(r.Next(50, 500));
        }
    }
}

internal class MultipleConsumersEg
{
    private static Channel<DummyData> tickQueue;
    private static readonly CancellationTokenSource TickQueueCancellationTokenSource = new CancellationTokenSource();
    public static async Task Main(string[] args)
    {
        tickQueue = Channel.CreateUnbounded<DummyData>();

        Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
        ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
         consumerOne.StartConsuming();

         p.StartProducing();

        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

所有消息都应该同时接收到消费者。

Pan*_*vos 6

通道是低级异步发布者/订阅者、顺序和操作保留队列。它们提供创建通信顺序处理管道所需的低级通信/排队功能。其他库看起来更优雅,只是因为它们在自己的发布/订阅队列之上添加了自己的代码。

这个名字意义重大。如果您愿意,通道可以实现进程/工作人员或发布者/订阅者之间的通信“通道”。它们是每个管道函数或对象的输入和输出,而不仅仅是内部集合。当以这种方式使用时,很容易实现一些非常复杂的行为。通道本身通常由工作人员拥有,它们不是某种全局程序状态。

您的问题并不严格要求广播或多播。不过,编写广播函数非常简单。它可以是简单(或简单化)的东西:

public static async Task CopyTo<T>(this ChannelReader<T> input,
        IList<ChannelWriter<T>> outputs, 
        CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAync(token).ConfigureAwait(false))
    {
        foreach(var o in outputs)
        {
            await o.WriteAsync(msg);
        }
    }
    foreach(var o in outputs)
    {
        o.TryComplete();
    }
}
Run Code Online (Sandbox Code Playgroud)

这会在各处复制相同的消息。只要输出通道不受限制或至少具有足够大的容量以避免填满,它就不会阻塞。

创建一个RouteTo按标签路由消息的方法也很容易,例如

public static async Task RouteTo<T>(this ChannelReader<T> input,
        IDictionary<string,ChannelWriter<T>> outputs, 
        Func<T,string> selector, 
        CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAync(token).ConfigureAwait(false))
    {
        var key=selector(msg);
        if (outputs.TryGetValue(key, out var o)
        {
            await o.WriteAsync(msg);
        }
    }
    foreach(var o in outputs.Values)
    {
        o.TryComplete();
    }
}
Run Code Online (Sandbox Code Playgroud)

错误处理、取消和等待必须根据应用程序的要求进行调整。例如,循环意味着如果一个通道是容量有限的通道,则其他通道将不得不等待。如果收集并等待所有写入任务,则可以避免这种情况Task.WhenAll

await Task.WhenAll(outputs.Select(o=>o.WriteAsync(msg)));
Run Code Online (Sandbox Code Playgroud)

假设生产者是一个 FIX 侦听器类,它收到的每条消息都应该通过ChannelReader<>输出属性发布:

public class FixProducer
{
   Channel<DummyData> _channel;

   public ChannelReader<DummyData> Output=>_channel.Reader;

   SomeFIXEngine _engine;
   public FixPublisher(SomeFIXEngine engine)
   {
       _engine=engine;
       _channel=Channel.CreateUnbounded<DummyData>();
   }

   public async Task StartProducing(CancellationToken token=default)
   {
       var writer=_channel.Writer;
       for (...)
       {
           if(token.IsCancellationRequested)
           {
               break;
           }
           var data=_engine.GetSomeData();
           await _writer.WriteAsync(data);
       }
       writer.Complete();
   }
}
Run Code Online (Sandbox Code Playgroud)

ChannelWriter消费者可以通过自己的Input 属性接收输入:

interface IConsumer<T>
{
    ChannelWriter<T> Input {get;}
}

class ConsumerOne:IConsumer<DummyData>
{
    private readonly Channel<DummyData> _input;
    
    public ChannelWriter<DummyData> Input=>_input.Writer;

    public ConsumerOne(...)
    {
        _input=Channel.CreateUnbounderd<DummyData>();        
    }

    public async Task StartConsuming(CancellationToken token=default)
    {
        await foreach (var message in _input.Reader.ReadAllAsync(token).ConfigureAwait(false))
        {
            ...
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

CopyTo现在可用于将 FIX 消息复制到所有消费者:

var producer=new FixProducer(...);
var consumerOne=new ConsumerOne(...);
var consumerTwo=new ConsumerTwo(...);
...

var copyTask=producer.Output.CopyTo(new[]{consumerOne.Input,consumerTwo.Input});

producer.StartProducing(...);
consumerOne.StartConsuming(...);
...
Run Code Online (Sandbox Code Playgroud)

既然通道由消费者拥有,就不需要有公共StartConsuming方法,可以在构造函数中调用它。

class ConsumerOne:IConsumer<DummyData>
{
    private readonly Channel<DummyData> _input;
    Task _consumeTask;
    public ChannelWriter<DummyData> Input=>_input.Writer;

    public ConsumerOne(...,CancellationToken token=default)
    {
        _input=Channel.CreateUnbounderd<DummyData>();        
        _consumeTask=StartConsuming(token);
    }

    async Task StartConsuming(CancellationToken token=default)
    {
        await foreach (var message in _input.Reader.ReadAllAsync(token).ConfigureAwait(false))
        {
            ...
        }
    }
    ...
}
Run Code Online (Sandbox Code Playgroud)

消费者任务将继续运行,直到上游生产者调用Complete()输入 ChannelWriter。