Dar*_*g8r 159 c# wcf task-parallel-library async-await parallel.foreach
在metro应用程序中,我需要执行许多WCF调用.有大量的调用,所以我需要在并行循环中进行调用.问题是并行循环在WCF调用完成之前退出.
你会如何重构这个按预期工作?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)
svi*_*ick 150
背后的整个想法Parallel.ForEach()
是你有一组线程,每个线程处理集合的一部分.正如您所注意到的,这不适用于async
- await
,您希望在异步调用期间释放线程.
你可以通过阻止ForEach()
线程来"修复"它,但这会破坏整个点async
- await
.
你可以做的是使用TPL Dataflow代替Parallel.ForEach()
,它支持异步Task
s.
具体来说,您的代码可以使用TransformBlock
一个Customer
使用async
lambda 将每个id转换为a 来编写.该块可以配置为并行执行.您可以将该块链接到ActionBlock
将每个块写入Customer
控制台的块.设置块网络后,您可以将Post()
每个ID设置为TransformBlock
.
在代码中:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)
虽然你可能想要将并行性限制TransformBlock
为一些小的常量.此外,您可以限制其容量,TransformBlock
并使用异步方式将项添加到其中SendAsync()
,例如,如果集合太大.
与代码(如果有效)相比,一个额外的好处是,只要单个项目完成,写入就会立即开始,而不是等到所有处理完成.
Ste*_*ary 117
svick的答案(像往常一样)很棒.
但是,当您实际需要传输大量数据时,我发现Dataflow更有用.或者当您需要async
兼容的队列时.
在您的情况下,更简单的解决方案是使用async
-style并行性:
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
{
ICustomerRepo repo = new CustomerRepo();
return repo.GetCustomer(i);
});
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)
Oha*_*der 74
使用DataFlow作为svick建议可能有点过分,而Stephen的回答并没有提供控制操作并发性的方法.但是,这可以简单地实现:
public static async Task RunWithMaxDegreeOfConcurrency<T>(
int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
//observe exceptions here
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
Run Code Online (Sandbox Code Playgroud)
该ToArray()
呼叫可以通过使用数组,而不是一个列表,并更换完成的任务进行优化,但我怀疑这会挣很多在大多数情况下的差别.OP问题的样本使用情况:
RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
Run Code Online (Sandbox Code Playgroud)
编辑研究员SO用户和TPL Wiz Eli Arbel向我指出了Stephen Toub的相关文章.像往常一样,他的实现既优雅又高效:
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
Run Code Online (Sandbox Code Playgroud)
Ser*_*nov 34
您可以使用新的AsyncEnumerator NuGet包节省工作量,该包在4年前最初发布问题时不存在.它允许您控制并行度:
using System.Collections.Async;
...
await ids.ParallelForEachAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
},
maxDegreeOfParallelism: 10);
Run Code Online (Sandbox Code Playgroud)
免责声明:我是AsyncEnumerator库的作者,该库是开源的并且在MIT下获得许可,我发布此消息只是为了帮助社区.
小智 14
将Parallel.Foreach
a换成a Task.Run()
而不是await
关键字use[yourasyncmethod].Result
(你需要做Task.Run的事情来阻止UI线程)
像这样的东西:
var yourForeachTask = Task.Run(() =>
{
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = repo.GetCustomer(i).Result;
customers.Add(cust);
});
});
await yourForeachTask;
Run Code Online (Sandbox Code Playgroud)
这应该非常高效,并且比使整个TPL Dataflow工作更容易:
var customers = await ids.SelectAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
});
...
public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
var results = new List<TResult>();
var activeTasks = new HashSet<Task<TResult>>();
foreach (var item in source)
{
activeTasks.Add(selector(item));
if (activeTasks.Count >= maxDegreesOfParallelism)
{
var completed = await Task.WhenAny(activeTasks);
activeTasks.Remove(completed);
results.Add(completed.Result);
}
}
results.AddRange(await Task.WhenAll(activeTasks));
return results;
}
Run Code Online (Sandbox Code Playgroud)
一种使用 SemaphoreSlim 并允许设置最大并行度的扩展方法
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Run Code Online (Sandbox Code Playgroud)
示例用法:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
Run Code Online (Sandbox Code Playgroud)
我参加聚会有点晚了,但您可能想考虑使用 GetAwaiter.GetResult() 在同步上下文中运行您的异步代码,但如下所示;
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
// Run this in thread which Parallel library occupied.
var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
customers.Add(cust);
});
Run Code Online (Sandbox Code Playgroud)
在介绍了一堆辅助方法之后,您将能够使用以下简单的语法运行并行查询:
const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
.Split(DegreeOfParallelism)
.SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
.ConfigureAwait(false);
Run Code Online (Sandbox Code Playgroud)
这里发生的事情是:我们将源集合分成 10 个块 ( .Split(DegreeOfParallelism)
),然后运行 10 个任务,每个任务一个一个地处理其项目 ( .SelectManyAsync(...)
) 并将它们合并回一个列表。
值得一提的是,有一种更简单的方法:
double[] result2 = await Enumerable.Range(0, 1000000)
.Select(async i => await CalculateAsync(i).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
Run Code Online (Sandbox Code Playgroud)
但它需要一个预防措施:如果您的源集合太大,它会立即Task
为每个项目安排一个,这可能会导致显着的性能下降。
上述示例中使用的扩展方法如下所示:
public static class CollectionExtensions
{
/// <summary>
/// Splits collection into number of collections of nearly equal size.
/// </summary>
public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
{
if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));
List<T> source = src.ToList();
var sourceIndex = 0;
for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
{
var list = new List<T>();
int itemsLeft = source.Count - targetIndex;
while (slicesCount * list.Count < itemsLeft)
{
list.Add(source[sourceIndex++]);
}
yield return list;
}
}
/// <summary>
/// Takes collection of collections, projects those in parallel and merges results.
/// </summary>
public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
this IEnumerable<IEnumerable<T>> source,
Func<T, Task<TResult>> func)
{
List<TResult>[] slices = await source
.Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
return slices.SelectMany(s => s);
}
/// <summary>Runs selector and awaits results.</summary>
public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
{
List<TResult> result = new List<TResult>();
foreach (TSource source1 in source)
{
TResult result1 = await selector(source1).ConfigureAwait(false);
result.Add(result1);
}
return result;
}
/// <summary>Wraps tasks with Task.WhenAll.</summary>
public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
{
return Task.WhenAll<TResult>(source);
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
97314 次 |
最近记录: |