创建可供任意数量的读者和作者同时使用的无限通道。
然而Channel具有单个ChannelReader且ChannelWriter仅的属性,并且似乎没有一种方法可以围绕现有通道显式创建读取器/写入器。
我曾认为,如果你有多个生产者/消费者,他们应该共享同一个写入器/读取器实例,这是不正确的吗?“读者/作者的数量”是指并发访问而不是类实例的数量吗?
ConcurrentQueue<T>我最近使用和构建了一个消费者/生产者系统SemaphoreSlim。然后利用新System.Threading.Channel类制作了另一个替代系统。
使用 BenchmarkDotNet 对两个系统进行基准测试后,将 1000 个项目写入两个系统 1000 次(并等待读者完成),我得到以下结果:
| Method | ItemsCount | Iterations | Mean | Error | StdDev | Median | Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
| MyQueue | 1000 | 1000 | 19,379.4 us | 1,230.30 us | 3,569.33 us | 18,735.6 us | 8235.02 KB |
| MyChannel | 1000 | 1000 | 45,858.2 us | 1,298.42 us | 3,704.46 us | 45,689.2 us | 72.11 KB |
Run Code Online (Sandbox Code Playgroud)
实施ConcurrentQueue …
c# performance producer-consumer concurrent-queue system.threading.channels
我正在使用 Microsoft.AspNetCore.SignalR 2.1 v1.0.4,并且使用 v1.0.4 的打字稿客户端使用 ChannelReader 流。
频道显示特定于单个实体的事件数据,因此当客户端的用户导航到该单个实体的页面呈现数据时,预计客户端将订阅频道。如果用户导航到同一页面但针对不同的实体,则客户端将进行另一个订阅调用。
现在我的问题是关于如何最好地取消订阅流,以及一般来说,流的生命周期对于集线器连接停止/启动场景下的客户端来说是什么,以及服务器是否显式中止连接(由于 access_token 超时从而触发客户端刷新他们的连接)?
似乎没有从 api 中浮现出一些连接状态,所以我目前使用 RxJs Subject 将一些连接状态呈现给我的 UI 组件/服务,即当集线器连接的启动调用成功时,我浮现“真”,当onclose 回调称为 I 表面“假”。这允许我尝试在先前订阅的流上调用 dispose 以在连接断开/停止期间清理内容,然后在成功启动调用时再次调用订阅流。
我试过在一个流上调用 dispose 如果集线器已连接,这很好,但如果连接处于断开状态,它会出错。我想知道这是否是一个错误。即使集线器断开连接,我也应该能够处理流吗?
可以只做一个delete streamsubscription然后根据需要重新创建,还是会以任何方式泄漏?
我会保持这个简短
阅读“在 ASP.NET Core SignalR 中使用流式传输”一文(第一部分,服务器到客户端流式传输)后, ChannelReader[T]和IAsyncEnumerable[T]之间有什么区别?
我正在使用ChannelfromSystem.Threading.Channels并想要批量读取项目(5 个项目),我有如下方法,
public class Batcher
{
private readonly Channel<MeasurementViewModel> _channel;
public Batcher()
{
_channel = Channel.CreateUnbounded<MeasurementViewModel>();
}
public async Task<MeasurementViewModel[]> ReadBatchAsync(int batchSize, CancellationToken stoppingToken)
{
var result = new MeasurementViewModel[batchSize];
for (var i = 0; i < batchSize; i++)
{
result[i] = await _channel.Reader.ReadAsync(stoppingToken);
}
return result;
}
}
Run Code Online (Sandbox Code Playgroud)
在 ASP.NET Core 后台服务中,我使用它,如下所示,
public class WriterService : BackgroundService
{
private readonly Batcher _batcher;
public WriterService(Batcher batcher)
{
_batcher = batcher;
}
protected override async Task ExecuteAsync(CancellationToken …Run Code Online (Sandbox Code Playgroud) ChannelReader<T>.ReadAllAsync被 CancellationToken 取消时是否抛出任何异常?它似乎没有抛出OperationCanceledException/TaskCanceledException?
我知道如果以“即发即忘”的方式调用这两个方法,即_ = SendLoopAsync(); _ = ReceiveLoopAsync();,它会使任务崩溃,并且不会显示消息/异常,因为它们没有被等待,这意味着我们正在丢失异常。
我不希望它在不让我知道它实际上已经崩溃/被取消的情况下使该任务崩溃,这意味着我应该将整个 SendLoopAsync 包装在 try/catch 中,而不是 ReadAllAsync 分支之间的内容。
一个代表其行为的小例子将不胜感激。
var clientWebSocket = new ClientWebSocket();
await clientWebSocket.ConnectAsync(new Uri("wss://www.deribit.com/ws/api/v2"), CancellationToken.None).ConfigureAwait(false);
var client = new ChannelWebSocket(clientWebSocket);
for (var i = 1; i <= 10; i++)
{
client.Output.TryWrite($"Item: {i}");
}
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));
await client.StartAsync(cts.Token).ConfigureAwait(false); // blocks the UI
Console.ReadLine();
public class ChannelExample
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public ChannelExample(WebSocket webSocket)
{ …Run Code Online (Sandbox Code Playgroud) 假设我有很多生产者,1个消费者未绑定 Channel,有一个消费者:
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
}
Run Code Online (Sandbox Code Playgroud)
问题在于该consume函数会进行一些 IO 访问,并且可能还会进行一些网络访问,因此在消耗 1 条消息之前可能会产生更多消息。但由于IO资源不能并发访问,所以我不能有很多消费者,也不能把函数扔到consume一个Task中然后忘记它。
该consume功能可以轻松修改以获取多条消息并批量处理它们。所以我的问题是,是否有一种方法可以让消费者在尝试访问通道队列时获取通道队列中的所有消息,如下所示:
while (true) {
Message[] messages = await channel.Reader.TakeAll();
await consumeAll(messages);
}
Run Code Online (Sandbox Code Playgroud)
编辑:我能想到的 1 个选项是:
List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
Message msg;
while (channel.Reader.TryRead(out msg))
messages.Add(msg);
if (messages.Count > 0)
{
await consumeAll(messages);
messages.Clear();
}
}
Run Code Online (Sandbox Code Playgroud)
但我觉得这应该是一个更好的方法来做到这一点。
我遇到一种情况,我需要从多个 IAsyncEnumerable 源接收数据。为了提高性能,应该以并行方式执行。
我已经使用AsyncAwaitBestPractices、System.Threading.Tasks.Dataflow和System.Linq.Async nuget 包编写了这样的代码来实现此目标:
public static async IAsyncEnumerable<T> ExecuteSimultaneouslyAsync<T>(
this IEnumerable<IAsyncEnumerable<T>> sources,
int outputQueueCapacity = 1,
TaskScheduler scheduler = null)
{
var sourcesCount = sources.Count();
var channel = outputQueueCapacity > 0
? Channel.CreateBounded<T>(sourcesCount)
: Channel.CreateUnbounded<T>();
sources.AsyncParallelForEach(
async body =>
{
await foreach (var item in body)
{
await channel.Writer.WaitToWriteAsync();
await channel.Writer.WriteAsync(item);
}
},
maxDegreeOfParallelism: sourcesCount,
scheduler: scheduler)
.ContinueWith(_ => channel.Writer.Complete())
.SafeFireAndForget();
while (await channel.Reader.WaitToReadAsync())
yield return await channel.Reader.ReadAsync();
}
public static async Task AsyncParallelForEach<T>( …Run Code Online (Sandbox Code Playgroud) c# producer-consumer task-parallel-library iasyncenumerable system.threading.channels
多个消费者是否可以接收同一条消息。我有一个生产者,它从网络套接字生成刻度数据(股票市场)。我现在有一个消费者每秒收到 1000 条消息,效果很好。但现在我想让多个消费者使用System.Threading.Channels. 单个生产者/消费者的完整工作代码。
class ConsumerOne
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _tag;
public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
_tag = tag;
}
public async Task StartConsuming()
{
await foreach (var message in _tickQueue.Reader.ReadAllAsync(
cancellationToken: _cancellationTokenSource.Token))
{
// Business logic of One
Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
}
}
}
public class DummyData
{
public long Ticks { get; set; }
public DateTime DateTime …Run Code Online (Sandbox Code Playgroud) c# producer-consumer publish-subscribe .net-core system.threading.channels
我使用 System.Threading.Channels 编写了异步队列。但是当我运行程序进行测试时,随机抛出以下异常并且工作线程被停止。
System.InvalidOperationException: The asynchronous operation has not completed.
at System.Threading.Channels.AsyncOperation.ThrowIncompleteOperationException()
at System.Threading.Channels.AsyncOperation`1.GetResult(Int16 token)
at AsyncChannels.Worker() in g:\src\gitrepos\dotnet-sandbox\channelstest\AsyncChannelsTest.cs:line 26
Run Code Online (Sandbox Code Playgroud)
如果异常被捕获并忽略,则代码正在运行。但我想摆脱原因不明的错误。
这是我的环境和最少的代码。
System.InvalidOperationException: The asynchronous operation has not completed.
at System.Threading.Channels.AsyncOperation.ThrowIncompleteOperationException()
at System.Threading.Channels.AsyncOperation`1.GetResult(Int16 token)
at AsyncChannels.Worker() in g:\src\gitrepos\dotnet-sandbox\channelstest\AsyncChannelsTest.cs:line 26
Run Code Online (Sandbox Code Playgroud) c# ×9
asp.net-core ×2
.net ×1
.net-core ×1
async-await ×1
batching ×1
channel ×1
performance ×1