C# 中是否有像 ConcurrentQueue 这样的数据结构,它允许我等待一个空队列,直到添加一个项目?

Mar*_*had 2 c# queue concurrency asynchronous

我正在寻找像 ConcurrentQueue 这样的对象,如果队列为空,它允许我等待出队操作,因此我可以执行如下操作:

public static async Task ServiceLoop() {

    var awaitedQueue = new AwaitedQueue<int>();

    while (!cancelled) {
        var item = await awaitableQueue.Dequeue();
        Console.WriteLine(item);
    }

}
Run Code Online (Sandbox Code Playgroud)

我已经编写了以下类,但是如果在调用 Dequeue 和新的等待者入队之间将一个项目添加到队列中,则会发生可怕的事情。

public class AwaitedQueue<T> : IDisposable {

    ConcurrentQueue<TaskCompletionSource<T>> awaiters = new ConcurrentQueue<TaskCompletionSource<T>>();

    ConcurrentQueue<T> items = new ConcurrentQueue<T>();

    public AwaitedQueue() { }

    public void Enqueue(T item) {
        if (!awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) {
            this.items.Enqueue(item);
        } else {
            awaiter.SetResult(item);
        }
    }

    public async Task<T> Dequeue() {
        if (items.TryDequeue(out T item)) {
            return item;
        } else {
            // If an item is enqueued between this call to create a new TaskCompletionSource.
            var awaiter = new TaskCompletionSource<T>();
            // And this call to actually enqueue, I believe it will cause me problems.
            awaiters.Enqueue(awaiter);
            return await awaiter.Task;
        }
    }

    public void Dispose() {
        while (awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) {
            awaiter.SetCanceled();
            awaiter.Task.Wait();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我确信这个概念的强大且经过充分测试的实现已经存在,但我不知道我需要在谷歌中输入哪种英语单词组合才能找到它。

Ste*_*ary 9

有一个现代的解决方案:Channels。“通道”是异步生产者/消费者队列。

通道也有“完成”的概念,所以你可以完成通道而不是有一个cancelled标志。

用法:

public static async Task ServiceLoop() {
  var awaitedQueue = Channel.CreateUnbounded<int>();
  var queueReader = awaitedQueue.Reader;

  while (await queueReader.WaitToReadAsync())
  {
    while (queueReader.TryRead(out var item))
    {
      Console.WriteLine(item);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)