你应该如何使用C#5 async来表示一系列异步任务?例如,如果我们想从服务器下载编号文件并在我们获取它时返回每个文件,我们如何实现这样的方法?
public async IEnumerable<File> DownloadPictures() {
const string format = "http://example.com/files/{0}.png";
for (int i = 0; i++; ) {
yield return await DownloadFile(string.Format(format, i));
}
}
Run Code Online (Sandbox Code Playgroud)
真正的序列不能直接与async/一起使用await,因为任务只返回单个值.您需要一个实际的可枚举类型,例如IAsyncEnumerator<T>在Ix-Async(或AsyncEx)中.该Channel9视频中IAsyncEnumerator<T>描述了该设计.
在我看来,你想要一些非常相似的东西BlockingCollection<T>,它使用Tasks而await不是阻塞.
具体来说,你可以添加到没有阻止或等待的东西.但是,当您尝试删除当前没有可用的项目时,您可以await直到某个项目可用.
公共接口可能如下所示:
public class AsyncQueue<T>
{
public bool IsCompleted { get; }
public Task<T> DequeueAsync();
public void Enqueue(T item);
public void FinishAdding();
}
Run Code Online (Sandbox Code Playgroud)
FinishAdding() 是必要的,这样我们就知道什么时候结束了.
有了这个,你的代码看起来像这样(m_queue是AsyncQueue<File>):
var tasks = Enumerable.Range(0, 10)
.Select(i => DownloadAndEnqueue(i))
.ToArray();
Task.WhenAll(tasks).ContinueWith(t => m_queue.FinishAdding());
…
static async Task DownloadAndEnqueue(string url)
{
m_queue.Enqueue(await DownloadFile(url));
}
Run Code Online (Sandbox Code Playgroud)
它并不像你想象的那样好,但应该有效.
并执行AsyncQueue<T>?有两个队列.一个是完成的工作,尚未出现.另一个是Tasks(实际上TaskCompletionSource<T>)已经出列,但是还没有任何结果.
当您出队并且队列中有一些已完成的工作时,只需从那里返回工作(使用Task.FromResult()).如果队列为空,则创建new Task,将其添加到另一个队列并返回它.
当你排队完成一些已完成的工作并且Task队列中有一些s时,删除一个并使用我们现在拥有的结果完成它.如果Task队列为空,请将工作添加到第一个队列.
有了这个,您可以根据需要多次出列和入队,并且它可以正常工作.如果您知道不会有任何新作品,请致电FinishAdding().如果有任何等待Task,他们将抛出异常.
换一种说法:
public class AsyncQueue<T>
{
private readonly object m_lock = new object();
private bool m_finishedAdding = false;
private readonly Queue<T> m_overflowQueue = new Queue<T>();
private readonly Queue<TaskCompletionSource<T>> m_underflowQueue =
new Queue<TaskCompletionSource<T>>();
public bool IsCompleted
{
get { return m_finishedAdding && m_overflowQueue.Count == 0; }
}
public Task<T> DequeueAsync()
{
Task<T> result;
lock (m_lock)
{
if (m_overflowQueue.Count > 0)
result = Task.FromResult(m_overflowQueue.Dequeue());
else if (!m_finishedAdding)
{
var tcs = new TaskCompletionSource<T>();
m_underflowQueue.Enqueue(tcs);
result = tcs.Task;
}
else
throw new InvalidOperationException();
}
return result;
}
public void Enqueue(T item)
{
lock (m_lock)
{
if (m_finishedAdding)
throw new InvalidOperationException();
if (m_underflowQueue.Count > 0)
{
var tcs = m_underflowQueue.Dequeue();
tcs.SetResult(item);
}
else
m_overflowQueue.Enqueue(item);
}
}
public void FinishAdding()
{
lock (m_lock)
{
m_finishedAdding = true;
while (m_underflowQueue.Count > 0)
{
var tcs = m_underflowQueue.Dequeue();
tcs.SetException(new InvalidOperationException());
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果您想限制工作队列的大小(从而限制生产者,如果它们太快),您也可以Enqueue()返回Task,这将需要另一个队列.
| 归档时间: |
|
| 查看次数: |
3974 次 |
| 最近记录: |