限制.NET 4.5中的并发任务量

mar*_*ark 11 .net asynchronous

观察者以下功能:

public Task RunInOrderAsync<TTaskSeed>(IEnumerable<TTaskSeed> taskSeedGenerator,
    CreateTaskDelegate<TTaskSeed> createTask,
    OnTaskErrorDelegate<TTaskSeed> onError = null,
    OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class
{
    Action<Exception, TTaskSeed> onFailed = (exc, taskSeed) =>
    {
        if (onError != null)
        {
            onError(exc, taskSeed);
        }
    };

    Action<Task> onDone = t =>
    {
        var taskSeed = (TTaskSeed)t.AsyncState;
        if (t.Exception != null)
        {
            onFailed(t.Exception, taskSeed);
        }
        else if (onSuccess != null)
        {
            onSuccess(t, taskSeed);
        }
    };

    var enumerator = taskSeedGenerator.GetEnumerator();
    Task task = null;
    while (enumerator.MoveNext())
    {
        if (task == null)
        {
            try
            {
                task = createTask(enumerator.Current);
                Debug.Assert(ReferenceEquals(task.AsyncState, enumerator.Current));
            }
            catch (Exception exc)
            {
                onFailed(exc, enumerator.Current);
            }
        }
        else
        {
            task = task.ContinueWith((t, taskSeed) =>
            {
                onDone(t);
                var res = createTask((TTaskSeed)taskSeed);
                Debug.Assert(ReferenceEquals(res.AsyncState, taskSeed));
                return res;
            }, enumerator.Current).TaskUnwrap();
        }
    }

    if (task != null)
    {
        task = task.ContinueWith(onDone);
    }

    return task;
}
Run Code Online (Sandbox Code Playgroud)

TaskUnwrap国家保留标准的版本在哪里Task.Unwrap:

public static class Extensions
{
    public static Task TaskUnwrap(this Task<Task> task, object state = null)
    {
        return task.Unwrap().ContinueWith((t, _) =>
        {
            if (t.Exception != null)
            {
                throw t.Exception;
            }
        }, state ?? task.AsyncState);
    }
}
Run Code Online (Sandbox Code Playgroud)

RunInOrderAsync方法允许异步运行N个任务,但是顺序地 - 一个接一个地运行.实际上,它运行从给定种子创建的任务,并发限制为1.

让我们假设createTask委托从种子创建的任务不对应于多个并发任务.

现在,我想抛出maxConcurrencyLevel参数,所以函数签名看起来像这样:

Task RunInOrderAsync<TTaskSeed>(int maxConcurrencyLevel,
  IEnumerable<TTaskSeed> taskSeedGenerator,
  CreateTaskDelegate<TTaskSeed> createTask,
  OnTaskErrorDelegate<TTaskSeed> onError = null,
  OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class
Run Code Online (Sandbox Code Playgroud)

在这里,我有点卡住了.

SO有这样的问题:

这基本上提出了两种方法来解决问题:

  1. 使用Parallel.ForEachParallelOptions指定MaxDegreeOfParallelism属性值等于所需的最大并发级别.
  2. 使用TaskScheduler具有所需MaximumConcurrencyLevel值的自定义.

第二种方法不会削减它,因为所涉及的所有任务必须使用相同的任务调度程序实例.为此,用于返回a的所有方法Task必须具有接受自定义TaskScheduler实例的重载.不幸的是,微软在这方面并不十分一致.例如,SqlConnection.OpenAsync不接受这样的论证(但TaskFactory.FromAsync确实如此).

第一种方法暗示我必须将任务转换为动作,如下所示:

() => t.Wait()
Run Code Online (Sandbox Code Playgroud)

我不确定这是一个好主意,但我很乐意获得更多的意见.

另一种方法是利用TaskFactory.ContinueWhenAny,但这是混乱的.

有任何想法吗?

编辑1

我想澄清想要限制的原因.我们的任务最终针对同一SQL服务器执行SQL语句.我们想要的是一种限制并发传出SQL语句数量的方法.完全有可能会有其他SQL语句从其他代码段同时执行,但这一个是批处理器,可能会泛滥服务器.

现在,请注意,虽然我们谈论的是同一个SQL服务器,但同一台服务器上有许多数据库.因此,它不是限制开放SQL连接到同一数据库的数量,因为数据库可能根本不相同.

这就是为什么厄运日的解决方案ThreadPool.SetMaxThreads()无关紧要.

现在,关于SqlConnection.OpenAsync.由于某种原因它被异步 - 它可能会向服务器进行往返,因此可能会受到网络延迟和分布式环境的其他可爱副作用的影响.因此,它与接受TaskScheduler参数的其他异步方法没有什么不同.我倾向于认为不接受一个只是一个错误.

编辑2

我想保留原始函数的异步精神.因此,我希望避免任何明确的阻止解决方案.

编辑3

感谢@ fsimonazzi的回答,我现在已经实现了所需功能.这是代码:

        var sem = new SemaphoreSlim(maxConcurrencyLevel);
        var tasks = new List<Task>();

        var enumerator = taskSeedGenerator.GetEnumerator();
        while (enumerator.MoveNext())
        {
            tasks.Add(sem.WaitAsync().ContinueWith((_, taskSeed) =>
            {
                Task task = null;
                try
                {
                    task = createTask((TTaskSeed)taskSeed);
                    if (task != null)
                    {
                        Debug.Assert(ReferenceEquals(task.AsyncState, taskSeed));
                        task = task.ContinueWith(t =>
                        {
                            sem.Release();
                            onDone(t);
                        });
                    }
                }
                catch (Exception exc)
                {
                    sem.Release();
                    onFailed(exc, (TTaskSeed)taskSeed);
                }
                return task;
            }, enumerator.Current).TaskUnwrap());
        }

        return Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => sem.Dispose());
Run Code Online (Sandbox Code Playgroud)

fsi*_*zzi 13

您可以使用信号量来限制处理.使用WaitAsync()方法可以获得预期的异步.像这样的东西(为简洁起见,删除了错误处理):

private static async Task DoStuff<T>(int maxConcurrency, IEnumerable<T> items, Func<T, Task> createTask)
{
    using (var sem = new SemaphoreSlim(maxConcurrency))
    {
        var tasks = new List<Task>();

        foreach (var item in items)
        {
            await sem.WaitAsync();
            var task = createTask(item).ContinueWith(t => sem.Release());
            tasks.Add(task);
        }

        await Task.WhenAll(tasks);
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑以删除在所有释放操作都有机会执行之前可以处置信号量的错误.


Pet*_*ons 5

这里已经有很多答案了。我想解决您在 Stephens 回答中发表的关于使用 TPL 数据流限制并发的示例的评论。即使你在这个问题的另一个答案中留下了评论,你不再使用基于任务的方法,它可能会帮助其他人。

ActionBlock<T>为此使用的示例是:

private static async Task DoStuff<T>(int maxConcurrency, IEnumerable<T> items, Func<T, Task> createTask)
{
    var ab = new ActionBlock<T>(createTask, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxConcurrency });

    foreach (var item in items)
    {
        ab.Post(item);
    }

    ab.Complete();
    await ab.Completion;
}
Run Code Online (Sandbox Code Playgroud)

有关 TPL 数据流的更多信息,请访问:https : //msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow(v= vs.110).aspx