是否有异步BlockingCollection <T>?

avo*_*avo 72 .net c# collections task-parallel-library async-await

我想awaitBlockingCollection<T>.Take()异步的结果,所以我不阻止线程.寻找这样的事情:

var item = await blockingCollection.TakeAsync();
Run Code Online (Sandbox Code Playgroud)

我知道我可以这样做:

var item = await Task.Run(() => blockingCollection.Take());
Run Code Online (Sandbox Code Playgroud)

但是有点杀死了整个想法,因为另一个线程ThreadPool被阻止了.

还有其他选择吗?

Ste*_*ary 77

我知道有三种选择.

第一个Read来自TPL Dataflow.如果您只有一个消费者,您可以使用WriteBufferBlock<T>,或者只是将其链接到OutputAvailableAsync.有关更多信息,请参阅我的博客.

另外两个是我创建的类型,可以在我的AsyncEx库中找到.

ReceiveAsyncActionBlock<T>近似等价的AsyncCollection<T>,能够包装并发生产者/消费者集合,如asyncBlockingCollection<T>.您可以使用ConcurrentQueue<T>异步使用集合中的项目.有关更多信息,请参阅我的博客.

ConcurrentBag<T>是一个更便携TakeAsync兼容的生产者/消费者队列.您可以使用AsyncProducerConsumerQueue<T>异步使用队列中的项目.有关更多信息,请参阅我的博客.

所有这三种选择都允许同步和异步放置和接受.

  • CodePlex最终关闭时的Git Hub链接:https://github.com/StephenCleary/AsyncEx (8认同)
  • @TheodorZoulias:因为“尝试”对不同的人来说意味着不同的事情。我正在考虑添加一个“Try”方法,但它实际上与原始方法具有不同的语义。还考虑在未来版本中支持异步流,这绝对是支持时的最佳消费方法。 (2认同)

Joh*_*ren 17

......或者你可以这样做:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

简单,功能齐全的异步FIFO队列.

注意:SemaphoreSlim.WaitAsync之前在.NET 4.5中添加了,这并不是那么简单.

  • 无限的“for”有什么用?如果信号量被释放,队列中至少有一项要出队,不是吗? (3认同)
  • @Blendester,如果多个消费者被阻止,可能会出现竞争条件。我们无法确定是否至少存在两个竞争的消费者,并且我们不知道他们是否都能够在对某个项目进行出队之前醒来。在发生竞争的情况下,如果一个未能成功出队,它将返回睡眠状态并等待另一个信号。 (3认同)
  • 这是一个阻塞集合,`TryDequeue` 的语义是,返回一个值,或者根本不返回。从技术上讲,如果您有超过 1 个读取器,则在任何其他读取器完全唤醒之前,同一个读取器可以消耗两个(或更多)项目。成功的“WaitAsync”只是一个信号,表明队列中可能有项目可供使用,而不是保证。 (2认同)

The*_*ias 5

这是一个BlockingCollection支持等待的非常基本的实现,但缺少许多功能。它使用AsyncEnumerable库,这使得 8.0 之前的 C# 版本可以进行异步枚举。

public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
    private Queue<T> _queue = new Queue<T>();
    private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
    private int _consumersCount = 0;
    private bool _isAddingCompleted;

    public void Add(T item)
    {
        lock (_queue)
        {
            if (_isAddingCompleted) throw new InvalidOperationException();
            _queue.Enqueue(item);
        }
        _semaphore.Release();
    }

    public void CompleteAdding()
    {
        lock (_queue)
        {
            if (_isAddingCompleted) return;
            _isAddingCompleted = true;
            if (_consumersCount > 0) _semaphore.Release(_consumersCount);
        }
    }

    public IAsyncEnumerable<T> GetConsumingEnumerable()
    {
        lock (_queue) _consumersCount++;
        return new AsyncEnumerable<T>(async yield =>
        {
            while (true)
            {
                lock (_queue)
                {
                    if (_queue.Count == 0 && _isAddingCompleted) break;
                }
                await _semaphore.WaitAsync();
                bool hasItem;
                T item = default;
                lock (_queue)
                {
                    hasItem = _queue.Count > 0;
                    if (hasItem) item = _queue.Dequeue();
                }
                if (hasItem) await yield.ReturnAsync(item);
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(100);
        abc.Add(i);
    }
    abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
    await abc.GetConsumingEnumerable().ForEachAsync(async item =>
    {
        await Task.Delay(200);
        await Console.Out.WriteAsync(item + " ");
    });
});
await Task.WhenAll(producer, consumer);
Run Code Online (Sandbox Code Playgroud)

输出:

1 2 3 4 5 6 7 8 9 10


更新:随着 C# 8 的发布,异步枚举已成为一种内置语言功能。所需的类 ( IAsyncEnumerable, IAsyncEnumerator) 嵌入在 .NET Core 3.0 中,并作为 .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces )的包提供。

这是一个替代GetConsumingEnumerable实现,具有新的 C# 8 语法:

public async IAsyncEnumerable<T> GetConsumingEnumerable()
{
    lock (_queue) _consumersCount++;
    while (true)
    {
        lock (_queue)
        {
            if (_queue.Count == 0 && _isAddingCompleted) break;
        }
        await _semaphore.WaitAsync();
        bool hasItem;
        T item = default;
        lock (_queue)
        {
            hasItem = _queue.Count > 0;
            if (hasItem) item = _queue.Dequeue();
        }
        if (hasItem) yield return item;
    }
}
Run Code Online (Sandbox Code Playgroud)

注意awaityield在同一方法中的共存。

使用示例(C# 8):

var consumer = Task.Run(async () =>
{
    await foreach (var item in abc.GetConsumingEnumerable())
    {
        await Task.Delay(200);
        await Console.Out.WriteAsync(item + " ");
    }
});
Run Code Online (Sandbox Code Playgroud)

请注意awaitforeach

  • 事后看来,我现在认为类名“AsyncBlockingCollection”是无意义的。某些东西不能同时是异步和阻塞的,因为这两个概念是完全相反的! (2认同)
  • 但它仍然是 BlockingCollection 的异步版本:) (2认同)