在C#5中表示异步序列

con*_*tor 19 c# asynchronous

你应该如何使用C#5 async来表示一系列异步任务?例如,如果我们想从服务器下载编号文件并在我们获取它时返回每个文件,我们如何实现这样的方法?

public async IEnumerable<File> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    for (int i = 0; i++; ) {
        yield return await DownloadFile(string.Format(format, i));
    }
}
Run Code Online (Sandbox Code Playgroud)

Ste*_*ary 5

真正的序列不能直接与async/一起使用await,因为任务只返回单个值.您需要一个实际的可枚举类型,例如IAsyncEnumerator<T>Ix-Async(或AsyncEx)中.该Channel9视频中IAsyncEnumerator<T>描述了该设计.


svi*_*ick 5

在我看来,你想要一些非常相似的东西BlockingCollection<T>,它使用Tasks而await不是阻塞.

具体来说,你可以添加到没有阻止或等待的东西.但是,当您尝试删除当前没有可用的项目时,您可以await直到某个项目可用.

公共接口可能如下所示:

public class AsyncQueue<T>
{
    public bool IsCompleted { get; }

    public Task<T> DequeueAsync();

    public void Enqueue(T item);

    public void FinishAdding();
}
Run Code Online (Sandbox Code Playgroud)

FinishAdding() 是必要的,这样我们就知道什么时候结束了.

有了这个,你的代码看起来像这样(m_queueAsyncQueue<File>):

var tasks = Enumerable.Range(0, 10)
    .Select(i => DownloadAndEnqueue(i))
    .ToArray();

Task.WhenAll(tasks).ContinueWith(t => m_queue.FinishAdding());

…

static async Task DownloadAndEnqueue(string url)
{
    m_queue.Enqueue(await DownloadFile(url));
}
Run Code Online (Sandbox Code Playgroud)

它并不像你想象的那样好,但应该有效.

并执行AsyncQueue<T>?有两个队列.一个是完成的工作,尚未出现.另一个是Tasks(实际上TaskCompletionSource<T>)已经出列,但是还没有任何结果.

当您出队并且队列中有一些已完成的工作时,只需从那里返回工作(使用Task.FromResult()).如果队列为空,则创建new Task,将其添加到另一个队列并返回它.

当你排队完成一些已完成的工作并且Task队列中有一些s时,删除一个并使用我们现在拥有的结果完成它.如果Task队列为空,请将工作添加到第一个队列.

有了这个,您可以根据需要多次出列和入队,并且它可以正常工作.如果您知道不会有任何新作品,请致电FinishAdding().如果有任何等待Task,他们将抛出异常.

换一种说法:

public class AsyncQueue<T>
{
    private readonly object m_lock = new object();

    private bool m_finishedAdding = false;

    private readonly Queue<T> m_overflowQueue = new Queue<T>();

    private readonly Queue<TaskCompletionSource<T>> m_underflowQueue =
        new Queue<TaskCompletionSource<T>>();

    public bool IsCompleted
    {
        get { return m_finishedAdding && m_overflowQueue.Count == 0; }
    }

    public Task<T> DequeueAsync()
    {
        Task<T> result;
        lock (m_lock)
        {
            if (m_overflowQueue.Count > 0)
                result = Task.FromResult(m_overflowQueue.Dequeue());
            else if (!m_finishedAdding)
            {
                var tcs = new TaskCompletionSource<T>();
                m_underflowQueue.Enqueue(tcs);
                result = tcs.Task;
            }
            else
                throw new InvalidOperationException();
        }
        return result;
    }

    public void Enqueue(T item)
    {
        lock (m_lock)
        {
            if (m_finishedAdding)
                throw new InvalidOperationException();

            if (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetResult(item);
            }
            else
                m_overflowQueue.Enqueue(item);
        }
    }

    public void FinishAdding()
    {
        lock (m_lock)
        {
            m_finishedAdding = true;

            while (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetException(new InvalidOperationException());
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如果您想限制工作队列的大小(从而限制生产者,如果它们太快),您也可以Enqueue()返回Task,这将需要另一个队列.

  • 如果有人现在正在阅读:只使用TPL Dataflow,它就是这样做的(还有更多). (2认同)