pow*_*tte -2 c# task-parallel-library
我有一组需要一次又一次并行处理的项目.
我可以做这个:
while (true)
{
Parallel.ForEach(sources, source =>
{
// do lots of work with source
})
}
Run Code Online (Sandbox Code Playgroud)
但问题是,如果一个源比其他源需要更长的时间,它将有效地挂起while循环,我想重新处理列表中的每个项目,而无需等待其他项目完成.
我怎么能做到这一点?为每个拥有循环的源代码启动任务,还是有更优雅的方式来执行此操作?
使用ConcurrentQueue你可以很简单地实现你正在寻找的东西.提供以下示例代码:
public Task CreateDownloader(ConcurrentQueue<string> queue, CancellationToken token,
TimeSpan interval, Action<string, string> continuation)
{
return Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
string url;
if (queue.TryDequeue(out url))
{
try
{
var result = await MockDownloader.Download(url);
continuation(url, result);
}
catch (Exception)
{
// Implement exception handling and logging here.
// Optionally, you can also pass in how to handle these.
}
queue.Enqueue(url);
}
await Task.Delay(interval);
}
});
}
Run Code Online (Sandbox Code Playgroud)
void Main()
{
var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var maxConcurrentDownloads = 4;
var sources = new ConcurrentQueue<string>(
new[] { "foo", "hanging", "bar" });
var tasks = Enumerable.Range(0, maxConcurrentDownloads)
.Select(_ => CreateDownloader(
// The list (queue) of URLs
sources,
// Cancellation token for clean shut down
cancellationToken,
// Time between downloads
TimeSpan.FromMilliseconds(100),
// The action taken for each download
(string url, string result) => Console.WriteLine($"{url} downloaded")));
Console.WriteLine("Download commenced, press any key to initiate shutdown");
Console.ReadKey(true);
Task.WaitAll(tasks.ToArray());
}
Run Code Online (Sandbox Code Playgroud)
public static class MockDownloader
{
static Random rnd = new Random();
public async static Task<string> Download(string url)
{
if (url == "hanging")
// wait for 5 seconds
await Task.Delay(TimeSpan.FromSeconds(2));
else
// wait for 500 to 700 ms
await Task.Delay(TimeSpan.FromMilliseconds(500 + rnd.Next(0, 200)));
return $"{url} done";
}
}
Run Code Online (Sandbox Code Playgroud)
这并不完美,但它展示了如何ConcurrentQueue以一种支持您需求的方式使用a .
其他方法是使用IObservable并Observable从反应扩展(Rx)的库.
您还可以Dataflow在TPL中进行利益研究.