Mis*_*hex 1 c# system.reactive
IScheduler接口提供
public static IDisposable Schedule(this IScheduler scheduler, Action action)
Run Code Online (Sandbox Code Playgroud)
和
public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, System.Threading.Tasks.Task<IDisposable>> action)
Run Code Online (Sandbox Code Playgroud)
ScheduleAsync的方法说明:
// Summary:
// Schedules work using an asynchronous method, allowing for cooperative scheduling
// in an imperative coding style.
//
// Parameters:
// scheduler:
// Scheduler to schedule work on.
//
// action:
// Asynchronous method to run the work, using Yield and Sleep operations for
// cooperative scheduling and injection of cancellation points.
//
// Returns:
// Disposable object that allows to cancel outstanding work on cooperative cancellation
// points or through the cancellation token passed to the asynchronous method.
//
// Exceptions:
// System.ArgumentNullException:
// scheduler or action is null.
Run Code Online (Sandbox Code Playgroud)
有人可以解释两种方法之间的差异吗?
我什么时候应该使用ScheduleAsync?
什么时候我应该使用Schedule?
通过允许以命令式编码风格进行协作调度意味着什么?
谢谢.
Jam*_*rld 12
这个答案是基于Rx团队在这篇文章中的直接解释- 警告它很长并且涵盖的不仅仅是这一点.请转到Rx查询运算符中标题为"aschronous"的部分,并解释所有内容,包括ScheduleAsyc标题为" 使调度程序更容易使用"一节中的特定示例"await"
这是我试图解释的:
主要动机ScheduleAsync是采用C#5的异步/等待功能来简化编写代码,该代码执行许多事件的"公平"调度,否则可能导致其他操作的调度程序不足.这就是"协作调度"的意思 - 与共享调度程序的其他代码一起玩得很好.您可以通过安排下一个事件,然后放弃控制直到该事件触发并挂钩到该事件来安排下一个事件,等等.
在Rx 2.0之前,这是通过递归调度实现的.
以下是链接文章中的示例,该文章提供了Range运算符的实现.这种实现很差,因为它通过不产生控制而使调度程序匮乏:
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.Schedule(() =>
{
for (int i = 0; i < count; i++)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(start + i);
}
observer.OnCompleted();
});
});
}
Run Code Online (Sandbox Code Playgroud)
注意OnNext如何在循环中锤击调度程序而不产生控制(如果调度程序是单线程的,则特别糟糕).它使得其他操作机构无法安排其行动,并且不允许在取消时中止.我们怎么解决这个问题?
这是通过递归调度解决的旧方法 - 很难看出发生了什么.这不是一种"命令式编码风格".递归调用self()在你第一次看到时非常不透明 - 而在我的情况下是十分之一,尽管我最终得到了它.传奇人物Bart de Smet的这篇经典帖子将告诉你更多关于这项技术的信息.无论如何,这是递归风格:
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.Schedule(0, (i, self) =>
{
if (i < count)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(start + i);
self(i + 1); /* Here is the recursive call */
}
else
{
observer.OnCompleted();
}
});
});
}
Run Code Online (Sandbox Code Playgroud)
除了更公平之外,如果处理订阅,下一个待处理的预定行动将被取消.
这是通过async/await的编译器转换继续的新方法,它允许"命令式编码风格".请注意,与递归样式相比的动机是更高的易读性 - 异步/等待突出显示正常情况下.NET发生的事情:
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
for (int i = 0; i < count; i++)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(i);
await ctrl.Yield(); /* Use a task continuation to schedule next event */
}
observer.OnCompleted();
return Disposable.Empty;
});
});
}
Run Code Online (Sandbox Code Playgroud)
它看起来就像一个for循环,但实际上await ctrl.Yield()它将产生控制,允许其他代码进入调度程序.它使用任务延续来一次只调度一个事件 - 也就是说,每次迭代仅在完成前一个迭代时发布到调度程序,从而避免直接在调度程序上排长队.取消也有效,这次Rx框架将订阅的处理转换为通过via传递的取消令牌ct.
如果链接仍然好,我建议阅读我从中获取的原始帖子!