异步任务的顺序处理

Dan*_*rth 32 .net c# .net-4.0 task-parallel-library

假设以下同步代码:

try
{
    Foo();
    Bar();
    Fubar();
    Console.WriteLine("All done");
}
catch(Exception e) // For illustration purposes only. Catch specific exceptions!
{
    Console.WriteLine(e);
}
Run Code Online (Sandbox Code Playgroud)

现在假设所有这些方法都有一个异步的对应,我不得不使用那些由于某种原因,所以简单地包裹了整个事情的新任务是不是一种选择.
我将如何实现相同的行为?
我对"相同"的意思是:

  1. 如果抛出异常,则执行异常处理程序.
  2. 如果抛出异常,则停止执行以下方法.

我唯一能想到的就是可怕:

var fooTask = FooAsync();
fooTask.ContinueWith(t => HandleError(t.Exception),
                     TaskContinuationOptions.OnlyOnFaulted);
fooTask.ContinueWith(
    t =>
    {
        var barTask = BarAsync();
        barTask.ContinueWith(t => HandleError(t.Exception),
                             TaskContinuationOptions.OnlyOnFaulted);
        barTask.ContinueWith(
            t =>
            {
                var fubarTask = FubarAsync();
                fubarTask.ContinueWith(t => HandleError(t.Exception),
                                       TaskContinuationOptions.OnlyOnFaulted);
                fubarTask.ContinueWith(
                    t => Console.WriteLine("All done"),
                    TaskContinuationOptions.OnlyOnRanToCompletion);
            }, 
            TaskContinuationOptions.OnlyOnRanToCompletion);
    }, 
    TaskContinuationOptions.OnlyOnRanToCompletion);
Run Code Online (Sandbox Code Playgroud)

请注意:

  • 我需要一个适用于.NET 4的解决方案,所以async/await不可能.但是,如果它可以async/await随意显示如何工作.
  • 我不需要使用TPL.如果使用TPL是不可能的,那么另一种方法就可以了,也许使用Reactive Extensions?

Ste*_*ary 28

以下是它的工作原理async:

try
{
    await FooAsync();
    await BarAsync();
    await FubarAsync();
    Console.WriteLine("All done");
}
catch(Exception e) // For illustration purposes only. Catch specific exceptions!
{
    Console.WriteLine(e);
}
Run Code Online (Sandbox Code Playgroud)

如果您安装了(预发布)Microsoft.Bcl.Async包,这将适用于.NET 4.0 .


既然你被困在VS2010上,你可以使用Stephen Toub的Then变体:

public static Task Then(this Task first, Func<Task> next)
{
  var tcs = new TaskCompletionSource<object>();
  first.ContinueWith(_ =>
  {
    if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
    else if (first.IsCanceled) tcs.TrySetCanceled();
    else
    {
      try
      {
        next().ContinueWith(t =>
        {
          if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
          else if (t.IsCanceled) tcs.TrySetCanceled();
          else tcs.TrySetResult(null);
        }, TaskContinuationOptions.ExecuteSynchronously);
      }
      catch (Exception exc) { tcs.TrySetException(exc); }
    }
  }, TaskContinuationOptions.ExecuteSynchronously);
  return tcs.Task; 
}
Run Code Online (Sandbox Code Playgroud)

您可以这样使用它:

var task = FooAsync().Then(() => BarAsync()).Then(() => FubarAsync());
task.ContinueWith(t =>
{
  if (t.IsFaulted || t.IsCanceled)
  {
    var e = t.Exception.InnerException;
    // exception handling
  }
  else
  {
    Console.WriteLine("All done");
  }
}, TaskContinuationOptions.ExcecuteSynchronously);
Run Code Online (Sandbox Code Playgroud)

使用Rx,它看起来像这样(假设您没有async已公开的方法IObservable<Unit>):

FooAsync().ToObservable()
    .SelectMany(_ => BarAsync().ToObservable())
    .SelectMany(_ => FubarAsync().ToObservable())
    .Subscribe(_ => { Console.WriteLine("All done"); },
        e => { Console.WriteLine(e); });
Run Code Online (Sandbox Code Playgroud)

我认为.无论如何,我不是Rx大师.:)

  • @DanielHilgarth:Stephen Toub有一篇[博客文章](http://blogs.msdn.com/b/pfxteam/archive/2010/11/21/10094564.aspx),这对我们很有帮助; 他实现了一个`Then`扩展方法,用于异步方法的更自然的顺序链接. (2认同)

Dan*_*rth 7

仅仅为了完整起见,我就是如何实现Chris Sinclair建议的辅助方法:

public void RunSequential(Action onComplete, Action<Exception> errorHandler,
                          params Func<Task>[] actions)
{
    RunSequential(onComplete, errorHandler,
                  actions.AsEnumerable().GetEnumerator());
}

public void RunSequential(Action onComplete, Action<Exception> errorHandler,
                          IEnumerator<Func<Task>> actions)
{
    if(!actions.MoveNext())
    {
        onComplete();
        return;
    }

    var task = actions.Current();
    task.ContinueWith(t => errorHandler(t.Exception),
                      TaskContinuationOptions.OnlyOnFaulted);
    task.ContinueWith(t => RunSequential(onComplete, errorHandler, actions),
                      TaskContinuationOptions.OnlyOnRanToCompletion);
}
Run Code Online (Sandbox Code Playgroud)

这确保了仅在前一个任务成功完成时才请求每个后续任务.
它假定Func<Task>返回已经运行的任务.


Ser*_*rvy 5

你在这里拥有的本质上是一个ForEachAsync.您希望按顺序运行每个异步项,但需要一些错误处理支持.这是一个这样的实现:

public static Task ForEachAsync(IEnumerable<Func<Task>> tasks)
{
    var tcs = new TaskCompletionSource<bool>();

    Task currentTask = Task.FromResult(false);

    foreach (Func<Task> function in tasks)
    {
        currentTask.ContinueWith(t => tcs.TrySetException(t.Exception.InnerExceptions)
            , TaskContinuationOptions.OnlyOnFaulted);
        currentTask.ContinueWith(t => tcs.TrySetCanceled()
                , TaskContinuationOptions.OnlyOnCanceled);
        Task<Task> continuation = currentTask.ContinueWith(t => function()
            , TaskContinuationOptions.OnlyOnRanToCompletion);
        currentTask = continuation.Unwrap();
    }

    currentTask.ContinueWith(t => tcs.TrySetException(t.Exception.InnerExceptions)
            , TaskContinuationOptions.OnlyOnFaulted);
    currentTask.ContinueWith(t => tcs.TrySetCanceled()
            , TaskContinuationOptions.OnlyOnCanceled);
    currentTask.ContinueWith(t => tcs.TrySetResult(true)
            , TaskContinuationOptions.OnlyOnRanToCompletion);

    return tcs.Task;
}
Run Code Online (Sandbox Code Playgroud)

我也支持取消任务,只是为了更一般,因为它花了很少的事情.

它将每个任务添加为上一个任务的延续,并且在整个过程中它确保任何异常都会导致设置最终任务的异常.

以下是一个示例用法:

public static Task FooAsync()
{
    Console.WriteLine("Started Foo");
    return Task.Delay(1000)
        .ContinueWith(t => Console.WriteLine("Finished Foo"));
}

public static Task BarAsync()
{
    return Task.Factory.StartNew(() => { throw new Exception(); });
}

private static void Main(string[] args)
{
    List<Func<Task>> list = new List<Func<Task>>();

    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => BarAsync());

    Task task = ForEachAsync(list);

    task.ContinueWith(t => Console.WriteLine(t.Exception.ToString())
        , TaskContinuationOptions.OnlyOnFaulted);
    task.ContinueWith(t => Console.WriteLine("Done!")
        , TaskContinuationOptions.OnlyOnRanToCompletion);
}
Run Code Online (Sandbox Code Playgroud)