如何使顺序处理像并行处理一样简单

Bre*_*ias 5 c# asynchronous .net-4.0 system.reactive .net-4.5

我有两个.net Task对象,我可能希望以parellel或顺序运行.在任何一种情况下,我都不想阻止线程等待它们.事实证明,Reactive Extensions让并行故事变得美妙.但是当我尝试按顺序排列任务时,代码可以工作,但感觉很尴尬.

我想知道是否有人可以展示如何使顺序版本更简洁或编码为并行版本毫不费力.没有必要使用反应式扩展来回答这个问题.

作为参考,这是我的两个并行和顺序处理解决方案.

并行处理版本

这是纯粹的快乐:

    public Task<string> DoWorkInParallel()
    {
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
        Task<bool> BravoTask = Task.Factory.StartNew(() => true);

        //Prepare for Rx, and set filters to allow 'Zip' to terminate early
        //in some cases.
        IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
        IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);

        Observable
            .Zip(
                AsyncAlpha,
                AsyncBravo,
                (x, y) => y.ToString() + x.ToString())
            .Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
                (x) => { result.TrySetResult(x); },
                (x) => { result.TrySetException(x.GetBaseException()); },
                () => { result.TrySetResult("Nothing"); });

        return result.Task;
    }
Run Code Online (Sandbox Code Playgroud)

顺序/流水线处理版本

这有效,但只是笨拙:

    public Task<string> DoWorkInSequence()
    {
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);

        AlphaTask.ContinueWith(x =>
        {
            if (x.IsFaulted)
            {
                result.TrySetException(x.Exception.GetBaseException());
            }
            else
            {
                if (x.Result != 5)
                {
                    Task<bool> BravoTask = Task.Factory.StartNew(() => true);
                    BravoTask.ContinueWith(y =>
                    {
                        if (y.IsFaulted)
                        {
                            result.TrySetException(y.Exception.GetBaseException());
                        }
                        else
                        {
                            if (y.Result)
                            {
                                result.TrySetResult(x.Result.ToString() + y.Result.ToString());
                            }
                            else
                            {
                                result.TrySetResult("Nothing");
                            }
                        }
                    });
                }
                else
                {
                    result.TrySetResult("Nothing");
                }
            }
        }
        );

        return result.Task;
    }
Run Code Online (Sandbox Code Playgroud)

在上面的顺序代码中,它变得一团糟,我甚至没有添加超时功能来匹配并行版本!

要求(8/6更新)

对于那些回答,请注意:

  1. 顺序场景应该允许第一任务的输出馈送第二任务的输入的布置.我上面的示例"笨拙"代码很容易被安排来实现这一点.

  2. 我对.net 4.5的答案很感兴趣 - 但.net 4.0的答案对我来说同样重要或者更重要.

  3. 任务'Alpha'和'Bravo'的合并时限为200毫秒,可以完成; 他们每人没有200ms.在连续的情况下也是如此.

  4. 如果任务返回无效结果,则SourceCompletionTask必须在两个任务完成之前提前完成.无效结果是[AlphaTask:5]或[BravoTask:false],如示例代码中的显式测试所示.
    更新8/8:澄清 - 在顺序的情况下,如果AlphaTask不成功或者超时已经发生,则BravoTask根本不应该执行.

  5. 假设AlphaTask和BravoTask都无法阻止.这并不重要,但在我的真实场景中,它们实际上是异步WCF服务调用.

也许有一个Rx的方面我可以利用来清理顺序版本.但即使只是任务编程本身应该有一个我想象的更好的故事.走着瞧.

ERRATA在两个代码示例中,我将返回类型更改为Task,因为海报答案非常正确,我不应该返回TaskCompletionSource.

Gid*_*rth 4

如果您可以使用 async/await,布兰登有一个很好的答案。如果您仍在使用 VS2010,那么清理顺序版本时我要做的第一件事就是获取一个扩展方法,例如ThenStephen Toub 在博客文章中描述的方法。Task.FromResult如果您不使用 .NET 4.5,我也会实现一个方法。有了这些,你可以获得:

public Task<string> DoWorkInSequence()
{
    return Task.FromResult(4)
           .Then(x => 
                 { if (x != 5)
                   {
                       return Task.FromResult(true)
                              .Then(y => 
                                    { if (y)
                                      {
                                          return Task.FromResult(x.ToString() + y.ToString());
                                      }
                                      else
                                      {
                                          return Task.FromResult("Nothing");
                                      }
                                    });
                    }
                    else
                    {
                        return Task.FromResult("Nothing");
                    }
                 });
}
Run Code Online (Sandbox Code Playgroud)

另外,您通常应该返回一个 Task 而不是 TaskCompletionSource(可以通过调用.TaskTaskCompletionSource 来获取),因为您不希望调用者在您返回给他们的任务上设置结果。

布兰登的答案还提供了实现超时功能的好方法(针对缺少 async/await 关键字进行调整)。

编辑 为了减少箭头代码,我们可以实现更多的 LINQ 方法。之前链接的博客文章中提供了 SelectMany 实现。LINQ 需要的其他方法是 Select 和Where。完成 Then 和 SelectMany 后,这些应该相当简单,但它们是:

public static Task<T> Where<T>(this Task<T> task, Func<T, bool> predicate)
{
    if (task == null) throw new ArgumentNullException("task");
    if (predicate == null) throw new ArgumentNullException("predicate");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
        {
            if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
            else if (completed.IsCanceled) tcs.TrySetCanceled();
            else
            {
                try
                {
                    if (predicate(completed.Result))
                        tcs.TrySetResult(completed.Result);
                    else
                        tcs.TrySetCanceled();
                }
                catch (Exception ex)
                {
                    tcs.TrySetException(ex);
                }
            }
        });
    return tcs.Task;
}

public static Task<TResult> Select<T, TResult>(this Task<T> task, Func<T, TResult> selector)
{
    if (task == null) throw new ArgumentNullException("task");
    if (selector == null) throw new ArgumentNullException("selector");

    var tcs = new TaskCompletionSource<TResult>();
    task.ContinueWith((completed) =>
    {
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetCanceled();
        else
        {
            try
            {
                tcs.TrySetResult(selector(completed.Result));
            }
            catch (Exception ex)
            {
                tcs.TrySetException(ex);
            }
        }
    });
    return tcs.Task;
}
Run Code Online (Sandbox Code Playgroud)

之后,最后一个非 LINQ 扩展方法允许在取消时返回默认值:

public static Task<T> IfCanceled<T>(this Task<T> task, T defaultValue)
{
    if (task == null) throw new ArgumentNullException("task");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
    {
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetResult(defaultValue);
        else tcs.TrySetResult(completed.Result);
    });
    return tcs.Task;
}
Run Code Online (Sandbox Code Playgroud)

以及新的和改进的 DoWork(无超时):

public static Task<string> DoWorkInSequence()
{
    return (from x in Task_FromResult(5)
            where x != 5
            from y in Task_FromResult(true)
            where y
            select x.ToString() + y.ToString()
           ).IfCanceled("Nothing");
}
Run Code Online (Sandbox Code Playgroud)

Brandon 的答案中的 Timeout 方法(一旦重写,如果需要,无需异步/等待)可以卡在链的末尾,以实现整体超时和/或在链中的每个步骤之后,如果您想阻止进一步的步骤在达到总体超时。链中断的另一种可能性是让所有单独的步骤都采用取消令牌并修改 Timeout 方法以采用 CancellationTokenSource 并在发生超时时取消它,并抛出超时异常。

编辑(布伦特·阿里亚斯)

从你所提出的想法中汲取灵感,我从我的视角中设计出了我认为的最终答案。它基于ParallelExtensionsExtras的 nuget 包中的 .net 4.0 扩展方法。下面的示例添加了第三个任务,以帮助说明根据我规定的要求对顺序任务进行编程的“感觉”:

public Task<string> DoWorkInSequence()
{
    var cts = new CancellationTokenSource();

    Task timer = Task.Factory.StartNewDelayed(200, () => { cts.Cancel(); });

    Task<int> AlphaTask = Task.Factory
        .StartNew(() => 4 )
        .Where(x => x != 5 && !cts.IsCancellationRequested);

    Task<bool> BravoTask = AlphaTask
        .Then(x => true)
        .Where(x => x && !cts.IsCancellationRequested);

    Task<int> DeltaTask = BravoTask
        .Then(x => 7)
        .Where(x => x != 8);

    Task<string> final = Task.Factory
        .WhenAny(DeltaTask, timer)
        .ContinueWith(x => !DeltaTask.IsCanceled && DeltaTask.Status == TaskStatus.RanToCompletion
            ? AlphaTask.Result.ToString() + BravoTask.Result.ToString() + DeltaTask.Result.ToString(): "Nothing");

    //This is here just for experimentation.  Placing it at different points
    //above will have varying effects on what tasks were cancelled at a given point in time.
    cts.Cancel();

    return final;
}
Run Code Online (Sandbox Code Playgroud)

在这次讨论和共同努力中,我提出了一些重要的观察结果:

  • 在简单的情况下使用“Then”扩展很好,但值得注意的是它的适用性有限。对于更复杂的情况,需要将其替换为,例如.ContinueWith(x => true, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default)。当针对我所说的场景将“Then”替换为“ContinueWith”时,添加该选项至关重要OnlyOnRanToCompletion
  • 使用超时扩展最终在我的场景中不起作用。这是因为它只会导致取消它立即附加到的任务,而不是取消序列中所有先前的任务实例。这就是为什么我转向这一StartNewDelayed(...)策略并在每个条款中添加了快速取消检查Where
  • 尽管 ParallelExtensionsExtras 库定义了您使用的LINQ to Tasks,但我得出的结论是,最好远离任务的 LINQ 风格外观。这是因为 LINQ 的任务非常深奥;这可能会让普通开发人员感到困惑。让他们理解异步编码已经够难的了。甚至 LINQ to Tasks 的作者也表示“这个 LINQ 实现在实践中的有用性是有争议的,但至少它提供了一个有趣的思维练习。” 是的,同意,这是一个有趣的思维练习。当然,我至少必须承认“Where”LINQ to Tasks 方法,因为它在我上面列出的解决方案中发挥了关键作用。