Jos*_*ant 52 c# throttling semaphore async-await tpl-dataflow
我想运行一堆异步任务,并限制在任何给定时间可以完成的任务数量.
假设您有1000个网址,并且您只希望一次打开50个请求; 但只要一个请求完成,您就会打开与列表中下一个URL的连接.这样,一次只打开50个连接,直到URL列表用完为止.
如果可能的话,我也想利用给定数量的线程.
我提出了一种扩展方法,ThrottleTasksAsync可以实现我想要的功能.那里有更简单的解决方案吗?我认为这是一种常见的情况.
用法:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
Run Code Online (Sandbox Code Playgroud)
这是代码:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
Run Code Online (Sandbox Code Playgroud)
该方法利用BlockingCollection并SemaphoreSlim使其工作.限制器在一个线程上运行,所有异步任务在另一个线程上运行.为了实现并行性,我添加了一个maxDegreeOfParallelism参数,该参数被传递给一个Parallel.ForEach重新用作while循环的循环.
旧版本是:
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
Run Code Online (Sandbox Code Playgroud)
但是,线程池快速耗尽,你不能做async/ await.
额外:
为了解决调用时BlockingCollection抛出异常的问题,我正在使用带超时的重载.如果我没有使用超时,它会破坏使用的目的,因为不会阻止.有没有更好的办法?理想情况下,会有一种方法.Take()CompleteAdding()TryTakeTryTakeBlockingCollectionTryTakeTakeAsync
dca*_*tro 55
如建议的那样,使用TPL Dataflow.
A TransformBlock<TInput, TOutput>可能是你正在寻找的.
您可以定义a MaxDegreeOfParallelism来限制并行转换的字符串数(即可以下载多少个URL).然后,您将网址发布到该区块,当您完成后,您告诉该区块您已完成添加项目并获取响应.
var downloader = new TransformBlock<string, HttpResponse>(
url => Download(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
);
var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);
foreach(var url in urls)
downloader.Post(url);
//or await downloader.SendAsync(url);
downloader.Complete();
await downloader.Completion;
IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
//process responses
}
Run Code Online (Sandbox Code Playgroud)
注意:TransformBlock缓冲输入和输出.那么,为什么我们需要将它链接到BufferBlock?
因为在TransformBlock所有items(HttpResponse)被消耗之前不会完成,并且await downloader.Completion会挂起.相反,我们将其downloader所有输出转发到专用缓冲区块 - 然后我们等待downloader完成,并检查缓冲区块.
nos*_*tio 45
假设您有1000个网址,并且您只希望一次打开50个请求; 但只要一个请求完成,您就会打开与列表中下一个URL的连接.这样,一次只打开50个连接,直到URL列表用完为止.
以下简单的解决方案已在SO上多次浮出水面.它不使用阻塞代码,也不会显式创建线程,因此它可以很好地扩展:
const int MAX_DOWNLOADS = 50;
static async Task DownloadAsync(string[] urls)
{
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async url =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
Console.WriteLine(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
}
Run Code Online (Sandbox Code Playgroud)
问题是,下载数据的处理应该在不同的管道上完成,具有不同的并行级别,特别是如果它是CPU绑定的处理.
例如,您可能希望有4个线程同时进行数据处理(CPU核心数),并且最多有50个待处理请求需要更多数据(根本不使用线程).AFAICT,这不是您的代码目前正在做的事情.
这就是TPL Dataflow或Rx作为首选解决方案可以派上用场的地方.然而,使用简单的TPL实现类似的东西当然是可能的.注意,这里唯一的阻塞代码是进行实际数据处理的代码Task.Run:
const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;
// process data
class Processing
{
SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
HashSet<Task> _pending = new HashSet<Task>();
object _lock = new Object();
async Task ProcessAsync(string data)
{
await _semaphore.WaitAsync();
try
{
await Task.Run(() =>
{
// simuate work
Thread.Sleep(1000);
Console.WriteLine(data);
});
}
finally
{
_semaphore.Release();
}
}
public async void QueueItemAsync(string data)
{
var task = ProcessAsync(data);
lock (_lock)
_pending.Add(task);
try
{
await task;
}
catch
{
if (!task.IsCanceled && !task.IsFaulted)
throw; // not the task's exception, rethrow
// don't remove faulted/cancelled tasks from the list
return;
}
// remove successfully completed tasks from the list
lock (_lock)
_pending.Remove(task);
}
public async Task WaitForCompleteAsync()
{
Task[] tasks;
lock (_lock)
tasks = _pending.ToArray();
await Task.WhenAll(tasks);
}
}
// download data
static async Task DownloadAsync(string[] urls)
{
var processing = new Processing();
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async (url) =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
// put the result on the processing pipeline
processing.QueueItemAsync(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks.ToArray());
await processing.WaitForCompleteAsync();
}
}
Run Code Online (Sandbox Code Playgroud)
根据要求,这是我最终使用的代码。
工作是在主从配置中设置的,每个主数据都作为批次进行处理。每个工作单元都以这种方式排队:
var success = true;
// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
await masterBuffer.SendAsync(master);
}
// Finished sending master records
masterBuffer.Complete();
// Now, wait for all the batches to complete.
await batchAction.Completion;
return success;
Run Code Online (Sandbox Code Playgroud)
一次缓冲一个主进程,以节省其他外部进程的工作量。每个主站的详细信息都通过masterTransform TransformManyBlock. 还创建ABatchedJoinBlock以收集一批详细信息。
实际工作是detailTransform TransformBlock异步完成的,一次 150 个。BoundedCapacity设置为 300,以确保不会在链的开头缓冲太多 Master,同时也为足够的详细记录排队留出空间,以允许一次处理 150 条记录。该块将 an 输出object到其目标,因为它会根据它是 a 还是 来在链接中进行Detail过滤Exception。
收集batchAction ActionBlock所有批次的输出,并为每个批次执行批量数据库更新、错误日志记录等。
会有多个BatchedJoinBlocks,每个 master 一个。由于每个批次ISourceBlock都是按顺序输出的,并且每个批次仅接受与一个主数据关联的明细记录数量,因此批次将按顺序处理。每个块仅输出一组,并且在完成时取消链接。只有最后一个批处理块将其完成传播到最终的ActionBlock。
数据流网络:
// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;
// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });
// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
var records = await StoredProcedures.GetObjectsAsync(masterRecord);
// Filter the master records based on some criteria here
var filteredRecords = records;
// Only propagate completion to the last batch
var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;
// Create a batch join block to encapsulate the results of the master record.
var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
// Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });
// Unlink batchjoinblock upon completion.
// (the returned task does not need to be awaited, despite the warning.)
batchjoinblock.Completion.ContinueWith(task =>
{
detailLink1.Dispose();
detailLink2.Dispose();
batchLink.Dispose();
});
return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
try
{
// Perform the action for each detail here asynchronously
await DoSomethingAsync();
return detail;
}
catch (Exception e)
{
success = false;
return e;
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });
// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
var details = batch.Item1.Cast<Detail>();
var errors = batch.Item2.Cast<Exception>();
// Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
20998 次 |
| 最近记录: |