X X*_*X X 2 c# parallel-processing task-parallel-library async-await
我在一个简单的例子中描述了我的问题,然后描述了一个更接近的问题.
想象一下我们在box1中有n个项目[i1,i2,i3,i4,...,in],我们有一个box2可以处理m个项目(m通常远小于n).每个项目所需的时间不同.我想总是做m个工作项目,直到所有项目都进行.
一个更接近的问题是,例如,你有一个n个字符串(URL地址)的list1文件,我们希望有一个系统同时下载m个文件(例如通过httpclient.getAsync()方法).每当m个项目中的一个的下载完成时,必须尽快替换list1中的另一个剩余项目,并且必须对其进行计数,直到所有List1项目都继续进行.(n和m的数量由运行时输入的用户指定)
怎么做到这一点?
您应该查看TPL Dataflow,将System.Threading.Tasks.Dataflow NuGet包添加到您的项目中,然后您想要的就像
private static HttpClient _client = new HttpClient();
public async Task<List<MyClass>> ProcessDownloads(IEnumerable<string> uris,
int concurrentDownloads)
{
var result = new List<MyClass>();
var downloadData = new TransformBlock<string, string>(async uri =>
{
return await _client.GetStringAsync(uri); //GetStringAsync is a thread safe method.
}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});
var processData = new TransformBlock<string, MyClass>(
json => JsonConvert.DeserializeObject<MyClass>(json),
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded});
var collectData = new ActionBlock<MyClass>(
data => result.Add(data)); //When you don't specifiy options dataflow processes items one at a time.
//Set up the chain of blocks, have it call `.Complete()` on the next block when the current block finishes processing it's last item.
downloadData.LinkTo(processData, new DataflowLinkOptions {PropagateCompletion = true});
processData.LinkTo(collectData, new DataflowLinkOptions {PropagateCompletion = true});
//Load the data in to the first transform block to start off the process.
foreach (var uri in uris)
{
await downloadData.SendAsync(uri).ConfigureAwait(false);
}
downloadData.Complete(); //Signal you are done adding data.
//Wait for the last object to be added to the list.
await collectData.Completion.ConfigureAwait(false);
return result;
}
Run Code Online (Sandbox Code Playgroud)
在上面的代码唯一concurrentDownloads号码HttpClients的将被激活,在任何给定时间,无限线程将被处理接收到的字符串和在物体转动它们,和一个单独的线程将采取那些对象并将其添加到列表.
更新:这是一个简单的例子,它只能满足你在问题中提出的要求
private static HttpClient _client = new HttpClient();
public void ProcessDownloads(IEnumerable<string> uris, int concurrentDownloads)
{
var downloadData = new ActionBlock<string>(async uri =>
{
var response = await _client.GetAsync(uri); //GetAsync is a thread safe method.
//do something with response here.
}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});
foreach (var uri in uris)
{
downloadData.Post(uri);
}
downloadData.Complete();
downloadData.Completion.Wait();
}
Run Code Online (Sandbox Code Playgroud)
这是您可以使用的通用方法.
当你调用这个TIn将是字符串(URL地址),asyncProcessor将是你的异步方法,它将URL地址作为输入并返回一个任务.
此方法使用的SlimSemaphore将实时仅允许n个并发异步I/O请求,一旦完成另一个请求将执行.像滑动窗口模式的东西.
public static Task ForEachAsync<TIn>(
IEnumerable<TIn> inputEnumerable,
Func<TIn, Task> asyncProcessor,
int? maxDegreeOfParallelism = null)
{
int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);
IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
await asyncProcessor(input).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
return Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
611 次 |
| 最近记录: |