bom*_*ina 4 c# producer-consumer publish-subscribe .net-core system.threading.channels
多个消费者是否可以接收同一条消息。我有一个生产者,它从网络套接字生成刻度数据(股票市场)。我现在有一个消费者每秒收到 1000 条消息,效果很好。但现在我想让多个消费者使用System.Threading.Channels. 单个生产者/消费者的完整工作代码。
class ConsumerOne
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _tag;
public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
_tag = tag;
}
public async Task StartConsuming()
{
await foreach (var message in _tickQueue.Reader.ReadAllAsync(
cancellationToken: _cancellationTokenSource.Token))
{
// Business logic of One
Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
}
}
}
public class DummyData
{
public long Ticks { get; set; }
public DateTime DateTime { get; set; }
public decimal Price { get; set; }
}
class Producer
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
public Producer(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
}
public async Task StartProducing()
{
Random r = new Random();
for (int i = 0; i < 10; i++)
{
await _tickQueue.Writer.WriteAsync(new DummyData()
{
DateTime = DateTime.Now,
Ticks = DateTime.Now.Ticks,
Price = Convert.ToDecimal(r.NextDouble() * r.Next(100, 105))
}, _cancellationTokenSource.Token);
await Task.Delay(r.Next(50, 500));
}
}
}
internal class MultipleConsumersEg
{
private static Channel<DummyData> tickQueue;
private static readonly CancellationTokenSource TickQueueCancellationTokenSource = new CancellationTokenSource();
public static async Task Main(string[] args)
{
tickQueue = Channel.CreateUnbounded<DummyData>();
Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
consumerOne.StartConsuming();
p.StartProducing();
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
上面的代码片段适用于单个生产者/消费者,小提琴链接。现在我想要另一个消费者用于不同的策略(每个消费者用于一个策略)。
class ConsumerTwo
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _tag;
public ConsumerTwo(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
_tag = tag;
}
public async Task StartConsuming()
{
await foreach (var message in _tickQueue.Reader.ReadAllAsync(
cancellationToken: _cancellationTokenSource.Token))
{
// Business logic of Two
Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
}
}
}
public static async Task Main(string[] args)
{
tickQueue = Channel.CreateUnbounded<DummyData>();
Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
consumerOne.StartConsuming();
ConsumerTwo consumerTwo = new ConsumerTwo(tickQueue, TickQueueCancellationTokenSource, "TWO");
consumerTwo.StartConsuming();
p.StartProducing();
Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)
添加第二个消费者后,它会消费数据,但两个消费者看不到相同的数据。这里我希望所有消费者都能收到全部 10 条消息。考虑到我将来最多可能有 50 个消费者,所有人都应该收到相同的消息。
输出:
class ConsumerOne
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _tag;
public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
_tag = tag;
}
public async Task StartConsuming()
{
await foreach (var message in _tickQueue.Reader.ReadAllAsync(
cancellationToken: _cancellationTokenSource.Token))
{
// Business logic of One
Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
}
}
}
public class DummyData
{
public long Ticks { get; set; }
public DateTime DateTime { get; set; }
public decimal Price { get; set; }
}
class Producer
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
public Producer(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
}
public async Task StartProducing()
{
Random r = new Random();
for (int i = 0; i < 10; i++)
{
await _tickQueue.Writer.WriteAsync(new DummyData()
{
DateTime = DateTime.Now,
Ticks = DateTime.Now.Ticks,
Price = Convert.ToDecimal(r.NextDouble() * r.Next(100, 105))
}, _cancellationTokenSource.Token);
await Task.Delay(r.Next(50, 500));
}
}
}
internal class MultipleConsumersEg
{
private static Channel<DummyData> tickQueue;
private static readonly CancellationTokenSource TickQueueCancellationTokenSource = new CancellationTokenSource();
public static async Task Main(string[] args)
{
tickQueue = Channel.CreateUnbounded<DummyData>();
Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
consumerOne.StartConsuming();
p.StartProducing();
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
所有消息都应该同时接收到消费者。
通道是低级异步发布者/订阅者、顺序和操作保留队列。它们提供创建通信顺序处理管道所需的低级通信/排队功能。其他库看起来更优雅,只是因为它们在自己的发布/订阅队列之上添加了自己的代码。
这个名字意义重大。如果您愿意,通道可以实现进程/工作人员或发布者/订阅者之间的通信“通道”。它们是每个管道函数或对象的输入和输出,而不仅仅是内部集合。当以这种方式使用时,很容易实现一些非常复杂的行为。通道本身通常由工作人员拥有,它们不是某种全局程序状态。
您的问题并不严格要求广播或多播。不过,编写广播函数非常简单。它可以是简单(或简单化)的东西:
public static async Task CopyTo<T>(this ChannelReader<T> input,
IList<ChannelWriter<T>> outputs,
CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAync(token).ConfigureAwait(false))
{
foreach(var o in outputs)
{
await o.WriteAsync(msg);
}
}
foreach(var o in outputs)
{
o.TryComplete();
}
}
Run Code Online (Sandbox Code Playgroud)
这会在各处复制相同的消息。只要输出通道不受限制或至少具有足够大的容量以避免填满,它就不会阻塞。
创建一个RouteTo按标签路由消息的方法也很容易,例如
public static async Task RouteTo<T>(this ChannelReader<T> input,
IDictionary<string,ChannelWriter<T>> outputs,
Func<T,string> selector,
CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAync(token).ConfigureAwait(false))
{
var key=selector(msg);
if (outputs.TryGetValue(key, out var o)
{
await o.WriteAsync(msg);
}
}
foreach(var o in outputs.Values)
{
o.TryComplete();
}
}
Run Code Online (Sandbox Code Playgroud)
错误处理、取消和等待必须根据应用程序的要求进行调整。例如,循环意味着如果一个通道是容量有限的通道,则其他通道将不得不等待。如果收集并等待所有写入任务,则可以避免这种情况Task.WhenAll
await Task.WhenAll(outputs.Select(o=>o.WriteAsync(msg)));
Run Code Online (Sandbox Code Playgroud)
假设生产者是一个 FIX 侦听器类,它收到的每条消息都应该通过ChannelReader<>输出属性发布:
public class FixProducer
{
Channel<DummyData> _channel;
public ChannelReader<DummyData> Output=>_channel.Reader;
SomeFIXEngine _engine;
public FixPublisher(SomeFIXEngine engine)
{
_engine=engine;
_channel=Channel.CreateUnbounded<DummyData>();
}
public async Task StartProducing(CancellationToken token=default)
{
var writer=_channel.Writer;
for (...)
{
if(token.IsCancellationRequested)
{
break;
}
var data=_engine.GetSomeData();
await _writer.WriteAsync(data);
}
writer.Complete();
}
}
Run Code Online (Sandbox Code Playgroud)
ChannelWriter消费者可以通过自己的Input 属性接收输入:
interface IConsumer<T>
{
ChannelWriter<T> Input {get;}
}
class ConsumerOne:IConsumer<DummyData>
{
private readonly Channel<DummyData> _input;
public ChannelWriter<DummyData> Input=>_input.Writer;
public ConsumerOne(...)
{
_input=Channel.CreateUnbounderd<DummyData>();
}
public async Task StartConsuming(CancellationToken token=default)
{
await foreach (var message in _input.Reader.ReadAllAsync(token).ConfigureAwait(false))
{
...
}
}
}
Run Code Online (Sandbox Code Playgroud)
CopyTo现在可用于将 FIX 消息复制到所有消费者:
var producer=new FixProducer(...);
var consumerOne=new ConsumerOne(...);
var consumerTwo=new ConsumerTwo(...);
...
var copyTask=producer.Output.CopyTo(new[]{consumerOne.Input,consumerTwo.Input});
producer.StartProducing(...);
consumerOne.StartConsuming(...);
...
Run Code Online (Sandbox Code Playgroud)
既然通道由消费者拥有,就不需要有公共StartConsuming方法,可以在构造函数中调用它。
class ConsumerOne:IConsumer<DummyData>
{
private readonly Channel<DummyData> _input;
Task _consumeTask;
public ChannelWriter<DummyData> Input=>_input.Writer;
public ConsumerOne(...,CancellationToken token=default)
{
_input=Channel.CreateUnbounderd<DummyData>();
_consumeTask=StartConsuming(token);
}
async Task StartConsuming(CancellationToken token=default)
{
await foreach (var message in _input.Reader.ReadAllAsync(token).ConfigureAwait(false))
{
...
}
}
...
}
Run Code Online (Sandbox Code Playgroud)
消费者任务将继续运行,直到上游生产者调用Complete()输入 ChannelWriter。
| 归档时间: |
|
| 查看次数: |
1758 次 |
| 最近记录: |