从阻塞集合中异步获取

Der*_*all 16 c# asynchronous task-parallel-library

我正在使用a BlockingCollection来实现生产者/消费者模式.我有一个异步循环,用于处理集合中要处理的数据,然后客户端可以在很晚的时候访问它.数据包稀疏地到达,我希望在不使用阻塞调用的情况下完成轮询.

从本质上说,我正在寻找的东西像BeginTakeEndTake不阻塞集合中存在,这样我可以在回调利用内部线程池.它BlockingCollection无论如何都不一定是.任何做我需要的东西都会很棒.

这就是我现在所拥有的._bufferedPackets是一个BlockingCollection<byte[]>:

public byte[] Read(int timeout)
{
    byte[] result;
    if (_bufferedPackets.IsCompleted)
    {
        throw new Exception("Out of packets");
    }
    _bufferedPackets.TryTake(out result, timeout);      
    return result;
}
Run Code Online (Sandbox Code Playgroud)

在伪代码中我喜欢这样的东西:

public void Read(int timeout)
{
    _bufferedPackets.BeginTake(result =>
        {
            var bytes = _bufferedPackets.EndTake(result);
            // Process the bytes, or the resuting timeout
        }, timeout, _bufferedPackets);
}
Run Code Online (Sandbox Code Playgroud)

我有什么选择?我不想将任何线程置于等待状态,因为它有很多其他的IO东西需要处理,而且我会很快耗尽线程.

更新:我已经重写了有问题的代码,以不同的方式使用异步进程,基本上根据超时限制内是否有等待请求来交换回调.这样可以正常工作,但如果有一种方法可以做到这一点而不诉诸定时器并交换lambdas可能导致竞争条件并且很难写(并理解),那么它仍然会很棒.我已经用自己的异步队列实现解决了这个问题,但如果有更标准且经过良好测试的选项,它仍然会很棒.

Der*_*all 0

所以看起来没有内置的选项,我出去尽力做我想要的实验。事实证明,为了使这项工作与旧异步模式的其他用户大致相同,需要做很多工作。

public class AsyncQueue<T>
{
    private readonly ConcurrentQueue<T> queue;
    private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue; 

    private class DequeueAsyncResult : IAsyncResult
    {
        public bool IsCompleted { get; set; }
        public WaitHandle AsyncWaitHandle { get; set; }
        public object AsyncState { get; set; }
        public bool CompletedSynchronously { get; set; }
        public T Result { get; set; }

        public AsyncCallback Callback { get; set; }
    }

    public AsyncQueue()
    {
        dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>();
        queue = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        DequeueAsyncResult asyncResult;
        while  (dequeueQueue.TryDequeue(out asyncResult))
        {
            if (!asyncResult.IsCompleted)
            {
                asyncResult.IsCompleted = true;
                asyncResult.Result = item;

                ThreadPool.QueueUserWorkItem(state =>
                {
                    if (asyncResult.Callback != null)
                    {
                        asyncResult.Callback(asyncResult);
                    }
                    else
                    {
                        ((EventWaitHandle) asyncResult.AsyncWaitHandle).Set();
                    }
                });
                return;
            }
        }
        queue.Enqueue(item);
    }

    public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state)
    {
        T result;
        if (queue.TryDequeue(out result))
        {
            var dequeueAsyncResult = new DequeueAsyncResult
            {
                IsCompleted = true, 
                AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset), 
                AsyncState = state, 
                CompletedSynchronously = true, 
                Result = result
            };
            if (null != callback)
            {
                callback(dequeueAsyncResult);
            }
            return dequeueAsyncResult;
        }

        var pendingResult = new DequeueAsyncResult
        {
            AsyncState = state, 
            IsCompleted = false, 
            AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset), 
            CompletedSynchronously = false,
            Callback = callback
        };
        dequeueQueue.Enqueue(pendingResult);
        Timer t = null;
        t = new Timer(_ =>
        {
            if (!pendingResult.IsCompleted)
            {
                pendingResult.IsCompleted = true;
                if (null != callback)
                {
                    callback(pendingResult);
                }
                else
                {
                    ((EventWaitHandle)pendingResult.AsyncWaitHandle).Set();
                }
            }
            t.Dispose();
        }, new object(), timeout, Timeout.Infinite);

        return pendingResult;
    }

    public T EndDequeue(IAsyncResult result)
    {
        var dequeueResult = (DequeueAsyncResult) result;
        return dequeueResult.Result;
    }
}
Run Code Online (Sandbox Code Playgroud)

我不太确定该IsComplete属性的同步,并且我不太热衷于如何dequeueQueue在后续调用中清理 only Enqueue。我也不确定何时是向等待句柄发出信号的正确时间,但这是迄今为止我得到的最佳解决方案。

请不要以任何方式考虑此生产质量代码。我只是想展示如何在不等待锁的情况下保持所有线程旋转的一般要点。我确信这充满了各种边缘情况和错误,但它满足了要求,我想回馈遇到这个问题的人们。