限制异步任务

Jos*_*ant 52 c# throttling semaphore async-await tpl-dataflow

我想运行一堆异步任务,并限制在任何给定时间可以完成的任务数量.

假设您有1000个网址,并且您只希望一次打开50个请求; 但只要一个请求完成,您就会打开与列表中下一个URL的连接.这样,一次只打开50个连接,直到URL列表用完为止.

如果可能的话,我也想利用给定数量的线程.

我提出了一种扩展方法,ThrottleTasksAsync可以实现我想要的功能.那里有更简单的解决方案吗?我认为这是一种常见的情况.

用法:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}
Run Code Online (Sandbox Code Playgroud)

这是代码:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}
Run Code Online (Sandbox Code Playgroud)

该方法利用BlockingCollectionSemaphoreSlim使其工作.限制器在一个线程上运行,所有异步任务在另一个线程上运行.为了实现并行性,我添加了一个maxDegreeOfParallelism参数,该参数被传递给一个Parallel.ForEach重新用作while循环的循环.

旧版本是:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}
Run Code Online (Sandbox Code Playgroud)

但是,线程池快速耗尽,你不能做async/ await.

额外: 为了解决调用时BlockingCollection抛出异常的问题,我正在使用带超时的重载.如果我没有使用超时,它会破坏使用的目的,因为不会阻止.有没有更好的办法?理想情况下,会有一种方法.Take()CompleteAdding()TryTakeTryTakeBlockingCollectionTryTakeTakeAsync

dca*_*tro 55

如建议的那样,使用TPL Dataflow.

A TransformBlock<TInput, TOutput>可能是你正在寻找的.

您可以定义a MaxDegreeOfParallelism来限制并行转换的字符串数(即可以下载多少个URL).然后,您将网址发布到该区块,当您完成后,您告诉该区块您已完成添加项目并获取响应.

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}
Run Code Online (Sandbox Code Playgroud)

注意:TransformBlock缓冲输入和输出.那么,为什么我们需要将它链接到BufferBlock

因为在TransformBlock所有items(HttpResponse)被消耗之前不会完成,并且await downloader.Completion会挂起.相反,我们将其downloader所有输出转发到专用缓冲区块 - 然后我们等待downloader完成,并检查缓冲区块.

  • @JoshWyant使用上面的代码,您只需发布任意数量的URL(使用"SendAsync").Theese将被块缓冲.该块将继续从缓冲区中获取URL,并且每次处理的时间绝不会超过50.然后将结果放入另一个缓冲区.`TransformBlock`缓冲其输入和输出. (2认同)
  • @dcastro 所以,我最终选择了 Dataflow 解决方案。我最初担心的是 `MaxDegreeOfParallelism` 的工作方式与 `Parallel.ForEach` 完全一样,只是创建任意数量的线程来实现并行性。我错了,该参数与 `async` 配合得很好。Tpl.Dataflow 工作得很好。谢谢! (2认同)

nos*_*tio 45

假设您有1000个网址,并且您只希望一次打开50个请求; 但只要一个请求完成,您就会打开与列表中下一个URL的连接.这样,一次只打开50个连接,直到URL列表用完为止.

以下简单的解决方案已在SO上多次浮出水面.它不使用阻塞代码,也不会显式创建线程,因此它可以很好地扩展:

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}
Run Code Online (Sandbox Code Playgroud)

问题是,下载数据的处理应该在不同的管道上完成,具有不同的并行级别,特别是如果它是CPU绑定的处理.

例如,您可能希望有4个线程同时进行数据处理(CPU核心数),并且最多有50个待处理请求需要更多数据(根本不使用线程).AFAICT,这不是您的代码目前正在做的事情.

这就是TPL Dataflow或Rx作为首选解决方案可以派上用场的地方.然而,使用简单的TPL实现类似的东西当然是可能的.注意,这里唯一的阻塞代码是进行实际数据处理的代码Task.Run:

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 如果在点击同一端点时使用.net核心中的默认新HttpClient()进行测试,请务必小心.默认情况下,它限制每个服务器的连接数(在fiddler中将其限制为2),除非您指定新的HttpClient(新的HttpClientHandler {MaxConnectionsPerServer = ...}).这个答案中的所有内容都与宣传的一样,但您仍然可以受到该设置的限制. (3认同)
  • 这是最简单,最直接的答案.这正是我试图做的事情.我的错误是试图在一个单独的线程上运行信号量,但这使得它变得如此简单,并消除了`BlockingCollection`.我只是没有意识到我可以这样使用`WaitAsync`.谢谢@Noseratio. (2认同)

Jos*_*ant 5

根据要求,这是我最终使用的代码。

工作是在主从配置中设置的,每个主数据都作为批次进行处理。每个工作单元都以这种方式排队:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;
Run Code Online (Sandbox Code Playgroud)

一次缓冲一个主进程,以节省其他外部进程的工作量。每个主站的详细信息都通过masterTransform TransformManyBlock. 还创建ABatchedJoinBlock以收集一批详细信息。

实际工作是detailTransform TransformBlock异步完成的,一次 150 个。BoundedCapacity设置为 300,以确保不会在链的开头缓冲太多 Master,同时也为足够的详细记录排队留出空间,以允许一次处理 150 条记录。该块将 an 输出object到其目标,因为它会根据它是 a 还是 来在链接中进行Detail过滤Exception

收集batchAction ActionBlock所有批次的输出,并为每个批次执行批量数据库更新、错误日志记录等。

会有多个BatchedJoinBlocks,每个 master 一个。由于每个批次ISourceBlock都是按顺序输出的,并且每个批次仅接受与一个主数据关联的明细记录数量,因此批次将按顺序处理。每个块仅输出一组,并且在完成时取消链接。只有最后一个批处理块将其完成传播到最终的ActionBlock

数据流网络:

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
Run Code Online (Sandbox Code Playgroud)