等待基于任务的队列

spe*_*der 33 c# queue asynchronous async-await .net-4.5

我想知道是否存在ConcurrentQueue的实现/包装器,类似于BlockingCollection,其中从集合中获取不会阻塞,而是异步并且将导致异步等待直到将项目放入队列中.

我已经提出了自己的实现,但它似乎没有按预期执行.我想知道我是否正在重塑已经存在的东西.

这是我的实现:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Ste*_*ary 51

我不知道无锁解决方案,但您可以查看新的Dataflow库,它是Async CTP的一部分.一个简单BufferBlock<T>就足够了,例如:

BufferBlock<int> buffer = new BufferBlock<int>();
Run Code Online (Sandbox Code Playgroud)

通过数据流块类型的扩展方法可以轻松完成生产和消费.

生产就像:

buffer.Post(13);
Run Code Online (Sandbox Code Playgroud)

和消费是异步准备:

int item = await buffer.ReceiveAsync();
Run Code Online (Sandbox Code Playgroud)

我建议你尽可能使用Dataflow; 使这样的缓冲区既高效又正确,比首次出现时更困难.

  • 而是在睡觉前偷看!看起来Dataflow非常适合我的需求.它似乎弥合了TPL提供的内容与CCR提供的内容之间的差距(我已经习惯了很大的成功).这让我感到很正面,CCR的出色工作并没有被浪费掉.这是正确的答案(还有一些闪亮的新东西让我沉溺其中!)谢谢@StephenCleary. (2认同)
  • @Fanblade:确实如此,但这些天我将人们引向“System.Threading.Channels”。渠道是一种非常高效且非常现代的解决方案。 (2认同)

Bru*_*ell 14

使用C#8.0 IAsyncEnumerable数据流库的简单方法

// Instatiate an async queue
var queue = new AsyncQueue<int>();

// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
    // Writes a line as soon as some other Task calls queue.Enqueue(..)
    Console.WriteLine(i);
}
Run Code Online (Sandbox Code Playgroud)

具有AsyncQueue如下实现:

public class AsyncQueue<T> : IAsyncEnumerable<T>
{
    private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
    private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();

    public void Enqueue(T item) =>
        _bufferBlock.Post(item);

    public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
    {
        // We lock this so we only ever enumerate once at a time.
        // That way we ensure all items are returned in a continuous
        // fashion with no 'holes' in the data when two foreach compete.
        await _enumerationSemaphore.WaitAsync();
        try {
            // Return new elements until cancellationToken is triggered.
            while (true) {
                // Make sure to throw on cancellation so the Task will transfer into a canceled state
                token.ThrowIfCancellationRequested();
                yield return await _bufferBlock.ReceiveAsync(token);
            }
        } finally {
            _enumerationSemaphore.Release();
        }

    }
}
Run Code Online (Sandbox Code Playgroud)

  • @valori 在 `lock` 内不能有 `await` (3认同)

kan*_*152 8

现在有一种官方方法可以做到这一点:System.Threading.Channels. 它内置于 .NET Core 3.0 及更高版本(包括 .NET 5.0 和 6.0)的核心运行时中,但它也可作为 .NET Standard 2.0 和 2.1 上的 NuGet 包使用。您可以在此处阅读文档。

var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();
Run Code Online (Sandbox Code Playgroud)

排队工作:

// This will succeed and finish synchronously if the channel is unbounded.
channel.Writer.TryWrite(42);
Run Code Online (Sandbox Code Playgroud)

要完成频道:

channel.Writer.TryComplete();
Run Code Online (Sandbox Code Playgroud)

从频道读取:

var i = await channel.Reader.ReadAsync();
Run Code Online (Sandbox Code Playgroud)

或者,如果您有 .NET Core 3.0 或更高版本:

await foreach (int i in channel.Reader.ReadAllAsync())
{
    // whatever processing on i...
}
Run Code Online (Sandbox Code Playgroud)

  • 有用的链接:[System.Threading.Channels 简介](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/) (3认同)

Rya*_*yan 7

实现这一点的一种简单易行的方法是使用SemaphoreSlim

public class AwaitableQueue<T>
{
    private SemaphoreSlim semaphore = new SemaphoreSlim(0);
    private readonly object queueLock = new object();
    private Queue<T> queue = new Queue<T>();

    public void Enqueue(T item)
    {
        lock (queueLock)
        {
            queue.Enqueue(item);
            semaphore.Release();
        }
    }

    public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        semaphore.Wait(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }

    public async Task<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        await semaphore.WaitAsync(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这样做的SemaphoreSlim好处是处理了实现Wait()WaitAsync()功能的所有复杂性。缺点是队列长度由信号量队列本身跟踪,并且它们都神奇地保持同步。


And*_*res 5

我的尝试(它在创建“承诺”时引发了一个事件,外部生产者可以使用它来了解何时生产更多项目):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();

    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);

        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }

            _bufferQueue.Enqueue(item);
        }            
    }

    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;

        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());

                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);

                    return promise.Task;
                }
            }
        }

        return Task.FromResult(item);
    }

    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }

    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}
Run Code Online (Sandbox Code Playgroud)


归档时间:

查看次数:

25830 次

最近记录:

6 年,8 月 前