avo*_*avo 72 .net c# collections task-parallel-library async-await
我想await在BlockingCollection<T>.Take()异步的结果,所以我不阻止线程.寻找这样的事情:
var item = await blockingCollection.TakeAsync();
Run Code Online (Sandbox Code Playgroud)
我知道我可以这样做:
var item = await Task.Run(() => blockingCollection.Take());
Run Code Online (Sandbox Code Playgroud)
但是有点杀死了整个想法,因为另一个线程ThreadPool被阻止了.
还有其他选择吗?
Ste*_*ary 77
我知道有三种选择.
第一个Read来自TPL Dataflow.如果您只有一个消费者,您可以使用Write或BufferBlock<T>,或者只是将其链接到OutputAvailableAsync.有关更多信息,请参阅我的博客.
另外两个是我创建的类型,可以在我的AsyncEx库中找到.
ReceiveAsync是ActionBlock<T>近似等价的AsyncCollection<T>,能够包装并发生产者/消费者集合,如async或BlockingCollection<T>.您可以使用ConcurrentQueue<T>异步使用集合中的项目.有关更多信息,请参阅我的博客.
ConcurrentBag<T>是一个更便携TakeAsync兼容的生产者/消费者队列.您可以使用AsyncProducerConsumerQueue<T>异步使用队列中的项目.有关更多信息,请参阅我的博客.
所有这三种选择都允许同步和异步放置和接受.
Joh*_*ren 17
......或者你可以这样做:
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class AsyncQueue<T>
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentQueue<T> _que;
public AsyncQueue()
{
_sem = new SemaphoreSlim(0);
_que = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
_que.Enqueue(item);
_sem.Release();
}
public void EnqueueRange(IEnumerable<T> source)
{
var n = 0;
foreach (var item in source)
{
_que.Enqueue(item);
n++;
}
_sem.Release(n);
}
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
{
for (; ; )
{
await _sem.WaitAsync(cancellationToken);
T item;
if (_que.TryDequeue(out item))
{
return item;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
简单,功能齐全的异步FIFO队列.
注意:
SemaphoreSlim.WaitAsync之前在.NET 4.5中添加了,这并不是那么简单.
这是一个BlockingCollection支持等待的非常基本的实现,但缺少许多功能。它使用AsyncEnumerable库,这使得 8.0 之前的 C# 版本可以进行异步枚举。
public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
private Queue<T> _queue = new Queue<T>();
private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
private int _consumersCount = 0;
private bool _isAddingCompleted;
public void Add(T item)
{
lock (_queue)
{
if (_isAddingCompleted) throw new InvalidOperationException();
_queue.Enqueue(item);
}
_semaphore.Release();
}
public void CompleteAdding()
{
lock (_queue)
{
if (_isAddingCompleted) return;
_isAddingCompleted = true;
if (_consumersCount > 0) _semaphore.Release(_consumersCount);
}
}
public IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
return new AsyncEnumerable<T>(async yield =>
{
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) await yield.ReturnAsync(item);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
用法示例:
var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(100);
abc.Add(i);
}
abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
await abc.GetConsumingEnumerable().ForEachAsync(async item =>
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
});
});
await Task.WhenAll(producer, consumer);
Run Code Online (Sandbox Code Playgroud)
输出:
1 2 3 4 5 6 7 8 9 10
更新:随着 C# 8 的发布,异步枚举已成为一种内置语言功能。所需的类 ( IAsyncEnumerable, IAsyncEnumerator) 嵌入在 .NET Core 3.0 中,并作为 .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces )的包提供。
这是一个替代GetConsumingEnumerable实现,具有新的 C# 8 语法:
public async IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) yield return item;
}
}
Run Code Online (Sandbox Code Playgroud)
注意await和yield在同一方法中的共存。
使用示例(C# 8):
var consumer = Task.Run(async () =>
{
await foreach (var item in abc.GetConsumingEnumerable())
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
}
});
Run Code Online (Sandbox Code Playgroud)
请注意await前foreach。