Dan*_*erg 8 c# multithreading system.reactive observable
我正在努力解决关于订阅Observable.FromEventPattern()
on的问题上的一些并发问题TaskPoolScheduler
.
让我用代码示例来说明:
var dataStore = new DataStore();
Observable.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
.SubscribeOn(TaskPoolScheduler.Default)
.Select(x => x.EventArgs)
.StartWith(new DataChangedEventArgs())
.Throttle(TimeSpan.FromMilliseconds(25))
.Select(x =>
{
Thread.Sleep(5000); // Simulate long-running calculation.
var result = 42;
return result;
})
.ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
.Subscribe(result =>
{
// Do some interesting work with the result.
// ...
// Do something that makes the DataStore raise another event.
dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
});
dataStore.RaiseDataChangedEvent(); // <- Returns immediately, i.e. does NOT wait for long-running calculation.
dataStore.RaiseDataChangedEvent(); // <- Blocks while waiting for the previous long-running calculation to complete, then returns eventually.
Run Code Online (Sandbox Code Playgroud)
我的问题是,当原始observable发出任何新项目Observable.FromEventPattern()
时(即当DataStore
对象引发新DataChanged
事件时),它们似乎被阻塞,等待前面的项目完成流经整个管道.
由于订阅是在TaskPoolScheduler
我预期的每一个新项目上完成的,只是简单地启动一个新任务,但实际上,如果管道繁忙,事件源似乎阻止了事件调用.
如何在其自己的任务/线程上完成执行每个新发出项(引发事件)的订阅,以便源对象永远不会阻塞其内部DataChangedEvent.Invoke()
调用?
(当然,除了Subscribe()
应该在UI线程上执行的lambda 之外- 已经是这种情况了.)
作为旁注:#jxnet Slack通道中提到的@jonstodle TaskPoolScheduler
可能具有与我假设的语义不同的语义.具体来说,他说它可能会创建一个任务,并在该一个任务内的事件循环中进行订阅和生成值.但如果是这种情况,那么我发现第一个事件调用没有阻塞(因为第二个事件调用),这有点奇怪.在我看来,如果执行订阅的任务池任务是异步的,以至于源不必在第一次调用时阻塞,那么是否需要在第二次调用时阻止它?
您遇到的问题仅仅是Rx的工作方式 - 在普通Rx管道中生成的每个值都是流水线的,并且一次只处理一个值.如果Rx管道的源,在您的情况下,FromEventPattern<DataChangedEventArgs>
生成值比观察者处理它们更快,那么它们将在管道中排队.
规则是管道中的每个观察者一次只处理一个值.这适用于任何调度程序,而不仅仅是TaskPoolScheduler
.
使它按照您想要的方式工作的方法非常简单 - 您创建并行管道,然后将值合并回一个管道.
这是改变:
Observable
.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
.SubscribeOn(TaskPoolScheduler.Default)
.Select(x => x.EventArgs)
.StartWith(new DataChangedEventArgs())
.Throttle(TimeSpan.FromMilliseconds(25))
.SelectMany(x =>
Observable.Start(() =>
{
Thread.Sleep(5000); // Simulate long-running calculation.
var result = 42;
return result;
}))
.ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
.Subscribe(result =>
{
// Do some interesting work with the result.
// ...
// Do something that makes the DataStore raise another event.
dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
});
Run Code Online (Sandbox Code Playgroud)
该.SelectMany(x => Observable.Start(() =>
取代.Select(x =>
允许值是其立即运行新观察到的订阅,然后将其合并值回一个可观察的.
您可能更喜欢将其写为语义相同.Select(x => Observable.Start(() => ...)).Merge()
.
这是一个简单的例子,展示了它的工作原理:
var source = new Subject<int>();
source
.SelectMany(x =>
Observable.Start(() =>
{
Thread.Sleep(1000);
return x * 2;
}))
.Subscribe(result =>
{
Console.WriteLine(result);
source.OnNext(result);
source.OnNext(result + 1);
});
source.OnNext(1);
Run Code Online (Sandbox Code Playgroud)
它产生:
2 4 6 14 12 8 10 24 28 30 26 16 20 22 18 48 50 56 52 58 60 62 54 32 34 46 44 40 42
归档时间: |
|
查看次数: |
649 次 |
最近记录: |