IScheduler.Schedule vs IScheduler.ScheduleAsync?

Mis*_*hex 1 c# system.reactive

IScheduler接口提供

public static IDisposable Schedule(this IScheduler scheduler, Action action)
Run Code Online (Sandbox Code Playgroud)

public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, System.Threading.Tasks.Task<IDisposable>> action)
Run Code Online (Sandbox Code Playgroud)

ScheduleAsync的方法说明:

    // Summary:
    //     Schedules work using an asynchronous method, allowing for cooperative scheduling
    //     in an imperative coding style.
    //
    // Parameters:
    //   scheduler:
    //     Scheduler to schedule work on.
    //
    //   action:
    //     Asynchronous method to run the work, using Yield and Sleep operations for
    //     cooperative scheduling and injection of cancellation points.
    //
    // Returns:
    //     Disposable object that allows to cancel outstanding work on cooperative cancellation
    //     points or through the cancellation token passed to the asynchronous method.
    //
    // Exceptions:
    //   System.ArgumentNullException:
    //     scheduler or action is null.
Run Code Online (Sandbox Code Playgroud)

有人可以解释两种方法之间的差异吗?

我什么时候应该使用ScheduleAsync?

什么时候我应该使用Schedule?

通过允许以命令式编码风格进行协作调度意味着什么?

谢谢.

Jam*_*rld 12

向前

这个答案是基于Rx团队在这篇文章中的直接解释- 警告它很长并且涵盖的不仅仅是这一点.请转到Rx查询运算符中标题为"aschronous"的部分,并解释所有内容,包括ScheduleAsyc标题为" 使调度程序更容易使用"一节中的特定示例"await"

这是我试图解释的:

摘要

主要动机ScheduleAsync是采用C#5的异步/等待功能来简化编写代码,该代码执行许多事件的"公平"调度,否则可能导致其他操作的调度程序不足.这就是"协作调度"的意思 - 与共享调度程序的其他代码一起玩得很好.您可以通过安排下一个事件,然后放弃控制直到该事件触发并挂钩到该事件来安排下一个事件,等等.

在Rx 2.0之前,这是通过递归调度实现的.

天真的例子

以下是链接文章中的示例,该文章提供了Range运算符的实现.这种实现很差,因为它通过不产生控制而使调度程序匮乏:

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.Schedule(() =>
        {
            for (int i = 0; i < count; i++)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(start + i);
            }
            observer.OnCompleted();
        });
    });
}
Run Code Online (Sandbox Code Playgroud)

注意OnNext如何在循环中锤击调度程序而不产生控制(如果调度程序是单线程的,则特别糟糕).它使得其他操作机构无法安排其行动,并且不允许在取消时中止.我们怎么解决这个问题?

递归调度 - Pre-Rx 2.0解决方案

这是通过递归调度解决的旧方法 - 很难看出发生了什么.这不是一种"命令式编码风格".递归调用self()在你第一次看到时非常不透明 - 而在我的情况下是十分之一,尽管我最终得到了它.传奇人物Bart de Smet的这篇经典帖子将告诉你更多关于这项技术的信息.无论如何,这是递归风格:

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.Schedule(0, (i, self) =>
        {
            if (i < count)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(start + i);
                self(i + 1); /* Here is the recursive call */
            }
            else
            {
                observer.OnCompleted();
            }
        });
    });
}
Run Code Online (Sandbox Code Playgroud)

除了更公平之外,如果处理订阅,下一个待处理的预定行动将被取消.

新的Async/await Style

这是通过async/await的编译器转换继续的新方法,它允许"命令式编码风格".请注意,与递归样式相比的动机是更高的易读性 - 异步/等待突出显示正常情况下.NET发生的事情:

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            for (int i = 0; i < count; i++)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(i);
                await ctrl.Yield(); /* Use a task continuation to schedule next event */
            }
            observer.OnCompleted();

            return Disposable.Empty;
        });
    });
}
Run Code Online (Sandbox Code Playgroud)

它看起来就像一个for循环,但实际上await ctrl.Yield()它将产生控制,允许其他代码进入调度程序.它使用任务延续来一次只调度一个事件 - 也就是说,每次迭代仅在完成前一个迭代时发布到调度程序,从而避免直接在调度程序上排长队.取消也有效,这次Rx框架将订阅的处理转换为通过via传递的取消令牌ct.

如果链接仍然好,我建议阅读我从中获取的原始帖子!