// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };
// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
var client = new HttpClient();
var html = await client.GetStringAsync(url);
});
Run Code Online (Sandbox Code Playgroud)
这是问题所在,它会同时启动1000多个Web请求.有没有一种简单的方法来限制这些异步http请求的并发数量?这样在任何给定时间都不会下载超过20个网页.如何以最有效的方式做到这一点?
我想并行处理一个集合,但是我在实现它时遇到了麻烦,因此我希望得到一些帮助.
如果我想在并行循环的lambda中调用C#中标记为async的方法,则会出现问题.例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
Run Code Online (Sandbox Code Playgroud)
计数为0时会出现问题,因为创建的所有线程实际上只是后台线程,并且Parallel.ForEach调用不等待完成.如果我删除async关键字,该方法如下所示:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
Run Code Online (Sandbox Code Playgroud)
它工作,但它完全禁用等待聪明,我必须做一些手动异常处理..(为简洁起见删除).
如何实现一个Parallel.ForEach在lambda中使用await关键字的循环?可能吗?
Parallel.ForEach方法的原型采用Action<T>as参数,但我希望它等待我的异步lambda.
编辑
我已经改变了问题的标题,以反映我的问题,但也回答了如何轻松实现这一目标.
我试图使第二种方法返回Task<TResult>而不是Task在第一种方法中,但是由于尝试修复它,我得到了一连串的错误.
return之前加了await body(partition.Current);return null下面添加了Task.Run到Task.Run<TResult>,但没有成功.我该如何解决?
第一种方法来自http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx,第二种方法是我正在尝试创建的重载.
public static class Extensions
{
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
public static Task ForEachAsync<T, TResult>(this IEnumerable<T> source, int dop, Func<T, Task<TResult>> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop) …Run Code Online (Sandbox Code Playgroud) 我必须在多个异步任务完成后立即使用它们的输出。
这些方法中的任何一种都会有合理的性能差异吗?
public async Task<List<Baz>> MyFunctionAsync(List<Foo> FooList) {
results = new List<Baz>();
List<Task<List<Baz>>> tasks = new List<Task<List<Baz>>>();
foreach (Foo foo in FooList) {
tasks.Add(FetchBazListFromFoo(entry));
foreach (Task<List<Baz>> task in tasks) {
results.AddRange(await task);
return results;
}
Run Code Online (Sandbox Code Playgroud)
public async Task<List<Baz>> MyFunctionAsync(List<Foo> FooList) {
results = new List<Baz>();
List<Task<List<Baz>>> tasks = new List<Task<List<Baz>>>();
foreach (Foo foo in FooList) {
tasks.Add(FetchBazListFromFoo(entry));
foreach (List<Baz> bazList in await Task.WhenAll(tasks))
results.AddRange(bazList);
return results;
}
Run Code Online (Sandbox Code Playgroud)
public async Task<List<Baz>> MyFunctionAsync(List<Foo> FooList) {
results = new List<Baz>(); …Run Code Online (Sandbox Code Playgroud) 假设我想发出并行 API post 请求。
在 for 循环中,我可以将 http post 调用附加到任务列表中(使用 Task.Run 调用的每个任务),然后等待所有任务完成使用await Task.WhenAll. 因此,在等待网络请求完成时,控制权将交给调用者。实际上,API 请求将并行发出。
同样,我可以使用Parallel.ForEachAsync它将自动执行WhenAll并将控制权返回给调用者。所以我想问是否ForEachAsync可以替换普通的 for 循环列表(async wait Task.Run)和WhenAll?
希望这里相当简单.我有一个对象集合,每个对象都有一个异步方法,我想调用它并从中收集值.我希望他们能并行运行.我想要实现的目标可以用一行代码来概括:
IEnumerable<TestResult> results = await Task.WhenAll(myCollection.Select(v => v.TestAsync()));
Run Code Online (Sandbox Code Playgroud)
我已经尝试了各种方法来写这个没有成功.有什么想法吗?
下面是Stephen Toub 编写的一个实现ForEachAsync
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
Run Code Online (Sandbox Code Playgroud)
指定 partitionCount 时应考虑哪些因素(dop在本例中)?
硬件是否有所作为(内核数量、可用 RAM 等)?
数据/操作的类型是否影响计数?
我的第一个猜测是在一般情况下设置为dop等于Environment.ProcessorCount,但我的直觉告诉我这可能无关。