如何等待一系列任务并停止等待第一个异常?

The*_*ias 5 .net c# task-parallel-library async-await

我有一系列任务,正在等他们Task.WhenAll。我的任务经常失败,在这种情况下,我会向用户显示一个消息框,以便她可以重试。我的问题是报告错误的时间延迟到所有任务完成为止。相反,我想在第一个任务抛出异常后立即通知用户。换句话说,我希望该版本Task.WhenAll能够快速失败。由于不存在这样的内置方法,因此我尝试创建自己的方法,但是我的实现未达到我想要的方式。这是我想出的:

public static async Task<TResult[]> WhenAllFailFast<TResult>(
    params Task<TResult>[] tasks)
{
    foreach (var task in tasks)
    {
        await task.ConfigureAwait(false);
    }
    return await Task.WhenAll(tasks).ConfigureAwait(false);
}
Run Code Online (Sandbox Code Playgroud)

通常,它的抛出速度比本机快Task.WhenAll,但通常不够快。在任务#1完成之前将不会观察到有故障的任务#2。我如何改进它以使其尽快失效?


更新:关于取消,这不是我现在的要求,但是可以说,为了保持一致,第一个被取消的任务应立即停止等待。在这种情况下,从返回的合并任务WhenAllFailFast应具有Status == TaskStatus.Canceled

说明:取消方案是关于用户单击“ 取消”按钮以阻止任务完成。它不是要在发生异常时自动取消未完成的任务。

Zal*_*nGG 7

最好的WhenAllFailFast办法是使用TaskCompletionSource. 您可以使用 .ContinueWith() 每个具有同步延续的输入任务,当任务以故障状态结束时(使用相同的异常对象),该任务会导致 TCS 出错。

也许类似(未完全测试):

using System;
using System.Threading;
using System.Threading.Tasks;

namespace stackoverflow
{
    class Program
    {
        static async Task Main(string[] args)
        {

            var cts = new CancellationTokenSource();
            cts.Cancel();
            var arr = await WhenAllFastFail(
                Task.FromResult(42),
                Task.Delay(2000).ContinueWith<int>(t => throw new Exception("ouch")),
                Task.FromCanceled<int>(cts.Token));

            Console.WriteLine("Hello World!");
        }

        public static Task<TResult[]> WhenAllFastFail<TResult>(params Task<TResult>[] tasks)
        {
            if (tasks is null || tasks.Length == 0) return Task.FromResult(Array.Empty<TResult>());

            // defensive copy.
            var defensive = tasks.Clone() as Task<TResult>[];

            var tcs = new TaskCompletionSource<TResult[]>();
            var remaining = defensive.Length;

            Action<Task> check = t =>
            {
                switch (t.Status)
                {
                    case TaskStatus.Faulted:
                        // we 'try' as some other task may beat us to the punch.
                        tcs.TrySetException(t.Exception.InnerException);
                        break;
                    case TaskStatus.Canceled:
                        // we 'try' as some other task may beat us to the punch.
                        tcs.TrySetCanceled();
                        break;
                    default:

                        // we can safely set here as no other task remains to run.
                        if (Interlocked.Decrement(ref remaining) == 0)
                        {
                            // get the results into an array.
                            var results = new TResult[defensive.Length];
                            for (var i = 0; i < tasks.Length; ++i) results[i] = defensive[i].Result;
                            tcs.SetResult(results);
                        }
                        break;
                }
            };

            foreach (var task in defensive)
            {
                task.ContinueWith(check, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
            }

            return tcs.Task;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑:解包 AggregateException,取消支持,返回结果数组。防御数组突变、空值和空值。显式 TaskScheduler。

  • 我建议使用“await”进行延续,例如使用本地“async”方法。如果您确实使用“ContinueWith”,则[应该传递“TaskScheduler”](https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html)。 (2认同)

The*_*ias 5

我最近再次需要该WhenAllFailFast方法,我修改了@ZaldronGG 的优秀解决方案,使其性能更高(并且更符合 Stephen Cleary 的建议)。下面的实现在我的 PC 中每秒处理大约 3,500,000 个任务。

public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
    if (tasks is null) throw new ArgumentNullException(nameof(tasks));
    if (tasks.Length == 0) return Task.FromResult(new TResult[0]);

    var results = new TResult[tasks.Length];
    var remaining = tasks.Length;
    var tcs = new TaskCompletionSource<TResult[]>(
        TaskCreationOptions.RunContinuationsAsynchronously);

    for (int i = 0; i < tasks.Length; i++)
    {
        var task = tasks[i];
        if (task == null) throw new ArgumentException(
            $"The {nameof(tasks)} argument included a null value.", nameof(tasks));
        HandleCompletion(task, i);
    }
    return tcs.Task;

    async void HandleCompletion(Task<TResult> task, int index)
    {
        try
        {
            var result = await task.ConfigureAwait(false);
            results[index] = result;
            if (Interlocked.Decrement(ref remaining) == 0)
            {
                tcs.TrySetResult(results);
            }
        }
        catch (OperationCanceledException)
        {
            tcs.TrySetCanceled();
        }
        catch (Exception ex)
        {
            tcs.TrySetException(ex);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)