基于阅读这个问题:SubscribeOn和ObserveOn之间有什么区别
ObserveOn设置Subscribe处理程序中代码的执行位置:
stream.Subscribe(_ => { // this code here });
该SubscribeOn方法设置完成流的设置的线程.
我明白如果没有明确设置,那么使用TaskPool.
现在我的问题是,让我说我做这样的事情:
Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));
哪里有Where predicate和SelectMany lots_of执行,因为some_action正在上调度执行?
我有一个winform应用程序,一个可观察的设置如下:
Form form = new Form();
Label lb = new Label();
form.Controls.Add(lb);
Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(l => lb.Text = l.ToString());
Application.Run(form);
Run Code Online (Sandbox Code Playgroud)
这不起作用,因为l => lb.Text = l.ToString()不会在创建表单的主线程上运行,但我无法弄清楚如何使它在此线程上运行.我想,我应该用IObservable.SubscribeOn这需要一个或者IScheduler或SynchronizationContext,但我不知道如何让主线程的的SynchronizationContext,唯一的调度程序,我能找到人的静态属性Scheduler,如Scheduler.CurrentThread,Immediate,NewThread,TaskPool和ThreadPool,没有一个有效.
我的Rx版本是1.0.10621.
我很难找到在我的 ViewModel 中安排长时间运行的反应性属性“getter”的正确方法。
RX 简介的这段摘录准确地描述了我想要做的事情:
- 响应某种用户操作
- 在后台线程上工作
- 将结果传回 UI 线程
- 更新用户界面
只有在这种情况下,除了用户交互之外,我才想对其他属性的变化做出反应。
下面是我用来从原始属性获取派生属性的通用模板(在实际代码中,有级联派生属性链)。
在 Reactive ViewModel(从 ReactiveObject 继承)中,我已经有一些从其他属性派生的属性。例如,当Original发生变化时,Derived将重新计算。
public TOriginal Original
{
get { return _original; }
set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;
public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;
this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
// ObserveOn? SubscribeOn? Which scheduler?
.Select(derivedValue => LongRunningCalculation(originalValue))
// Same thing here: ObserveOn? SubscribeOn? Which scheduler? …Run Code Online (Sandbox Code Playgroud) 给定一个同步观察者,有没有办法做到这一点:
observable.SubscribeAsync(observer);
Run Code Online (Sandbox Code Playgroud)
并且observer异步调用所有方法或者是在创建观察者时必须处理的内容吗?
c# multithreading asynchronous reactive-programming system.reactive