Dax*_*ohl 21 c# system.reactive async-await c#-5.0 .net-4.5
因此,在C#4.0的悲伤时期,我创建了以下"WorkflowExecutor"类,它允许在GUI线程中通过入侵IEnumerable的"yield return"延续来等待可观察的异步工作流.因此,以下代码将在button1Click处启动一个简单的工作流程来更新文本,等待您单击button2,并在1秒后循环.
public sealed partial class Form1 : Form {
readonly Subject<Unit> _button2Subject = new Subject<Unit>();
readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor();
public Form1() {
InitializeComponent();
}
IEnumerable<IObservable<Unit>> CreateAsyncHandler() {
Text = "Initializing";
var scheduler = new ControlScheduler(this);
while (true) {
yield return scheduler.WaitTimer(1000);
Text = "Waiting for Click";
yield return _button2Subject;
Text = "Click Detected!";
yield return scheduler.WaitTimer(1000);
Text = "Restarting";
}
}
void button1_Click(object sender, EventArgs e) {
_workflowExecutor.Run(CreateAsyncHandler());
}
void button2_Click(object sender, EventArgs e) {
_button2Subject.OnNext(Unit.Default);
}
void button3_Click(object sender, EventArgs e) {
_workflowExecutor.Stop();
}
}
public static class TimerHelper {
public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) {
return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default);
}
}
public sealed class WorkflowExecutor {
IEnumerator<IObservable<Unit>> _observables;
IDisposable _subscription;
public void Run(IEnumerable<IObservable<Unit>> actions) {
_observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator();
Continue();
}
void Continue() {
if (_subscription != null) {
_subscription.Dispose();
}
if (_observables.MoveNext()) {
_subscription = _observables.Current.Subscribe(_ => Continue());
}
}
public void Stop() {
Run(null);
}
}
Run Code Online (Sandbox Code Playgroud)
这个想法的聪明部分,使用"yield"延续来完成异步工作,取自Daniel Earwicker的AsyncIOPipe想法:http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield- return-of-lambdas /,然后我在它上面添加了反应框架.
现在我无法使用C#5.0中的异步功能重写它,但看起来它应该是直截了当的事情.当我将observable转换为任务时,它们只运行一次而while循环第二次崩溃.任何帮助修复都会很棒.
所有那些说/问,async/await机制给我的WorkflowExecutor没有什么?我可以用async/await做任何我不能做的事情(给出类似数量的代码)和WorkflowExecutor吗?
小智 31
正如James所提到的,你可以等待从Rx v2.0 Beta开始的IObservable <T>序列.行为是返回最后一个元素(在OnCompleted之前),或抛出观察到的OnError.如果序列不包含任何元素,则会出现InvalidOperationException.
请注意使用此功能,您可以获得所有其他所需的行为:
你可以做更多奇特的事情,比如计算聚合的结果,但是通过使用Do和Scan来观察中间值:
var xs = Observable.Range(0, 10, Scheduler.Default);
var res = xs.Scan((x, y) => x + y)
.Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); });
Console.WriteLine("Done! The sum is {0}", await res);
Run Code Online (Sandbox Code Playgroud)
Jam*_*ing 29
正如您所注意到的,Task是一次性使用的东西,而不是Observable的"事件流".一个很好的思考方式(恕我直言)是Rx团队关于2.0 Beta的帖子的2x2图表:

根据情况(一次性与事件的"流"),保持Observable可能更有意义.
如果您可以跳到Reactive 2.0 Beta,那么您可以"等待"可观察量.例如,我自己尝试代码的"异步/等待"(近似)版本将是:
public sealed partial class Form1 : Form
{
readonly Subject<Unit> _button2Subject = new Subject<Unit>();
private bool shouldRun = false;
public Form1()
{
InitializeComponent();
}
async Task CreateAsyncHandler()
{
Text = "Initializing";
while (shouldRun)
{
await Task.Delay(1000);
Text = "Waiting for Click";
await _button2Subject.FirstAsync();
Text = "Click Detected!";
await Task.Delay(1000);
Text = "Restarting";
}
}
async void button1_Click(object sender, EventArgs e)
{
shouldRun = true;
await CreateAsyncHandler();
}
void button2_Click(object sender, EventArgs e)
{
_button2Subject.OnNext(Unit.Default);
}
void button3_Click(object sender, EventArgs e)
{
shouldRun = false;
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
15354 次 |
| 最近记录: |