在Reactive Extensions for .NET中使用Observable.FromEventPattern时,如何避免任何阻塞?

Dan*_*erg 8 c# multithreading system.reactive observable

我正在努力解决关于订阅Observable.FromEventPattern()on的问题上的一些并发问题TaskPoolScheduler.

让我用代码示例来说明:

var dataStore = new DataStore();

Observable.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
    .SubscribeOn(TaskPoolScheduler.Default)
    .Select(x => x.EventArgs)
    .StartWith(new DataChangedEventArgs())
    .Throttle(TimeSpan.FromMilliseconds(25))
    .Select(x => 
    {
        Thread.Sleep(5000); // Simulate long-running calculation.
        var result = 42;
        return result;
    })
    .ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
    .Subscribe(result =>
    {
        // Do some interesting work with the result.
        // ...

        // Do something that makes the DataStore raise another event.
        dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
    });

dataStore.RaiseDataChangedEvent(); // <- Returns immediately, i.e. does NOT wait for long-running calculation.
dataStore.RaiseDataChangedEvent(); // <- Blocks while waiting for the previous long-running calculation to complete, then returns eventually.
Run Code Online (Sandbox Code Playgroud)

我的问题是,当原始observable发出任何新项目Observable.FromEventPattern()时(即当DataStore对象引发新DataChanged事件时),它们似乎被阻塞,等待前面的项目完成流经整个管道.

由于订阅是在TaskPoolScheduler我预期的每一个新项目上完成的,只是简单地启动一个新任务,但实际上,如果管道繁忙,事件源似乎阻止了事件调用.

如何在其自己的任务/线程上完成执行每个新发出项(引发事件)的订阅,以便源对象永远不会阻塞其内部DataChangedEvent.Invoke()调用?

(当然,除了Subscribe()应该在UI线程上执行的lambda 之外- 已经是这种情况了.)

作为旁注:#jxnet Slack通道中提到的@jonstodle TaskPoolScheduler可能具有与我假设的语义不同的语义.具体来说,他说它可能会创建一个任务,并在该一个任务内的事件循环中进行订阅和生成值.但如果是这种情况,那么我发现第一个事件调用没有阻塞(因为第二个事件调用),这有点奇怪.在我看来,如果执行订阅的任务池任务是异步的,以至于源不必在第一次调用时阻塞,那么是否需要在第二次调用时阻止它?

Eni*_*ity 9

您遇到的问题仅仅是Rx的工作方式 - 在普通Rx管道中生成的每个值都是流水线的,并且一次只处理一个值.如果Rx管道的源,在您的情况下,FromEventPattern<DataChangedEventArgs>生成值比观察者处理它们更快,那么它们将在管道中排队.

规则是管道中的每个观察者一次只处理一个值.这适用于任何调度程序,而不仅仅是TaskPoolScheduler.

使它按照您想要的方式工作的方法非常简单 - 您创建并行管道,然后将值合并回一个管道.

这是改变:

Observable
    .FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
    .SubscribeOn(TaskPoolScheduler.Default)
    .Select(x => x.EventArgs)
    .StartWith(new DataChangedEventArgs())
    .Throttle(TimeSpan.FromMilliseconds(25))
    .SelectMany(x =>
        Observable.Start(() =>
        {
            Thread.Sleep(5000); // Simulate long-running calculation.
            var result = 42;
            return result;
        }))
    .ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
    .Subscribe(result =>
    {
        // Do some interesting work with the result.
        // ...

        // Do something that makes the DataStore raise another event.
        dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
    });
Run Code Online (Sandbox Code Playgroud)

.SelectMany(x => Observable.Start(() =>取代.Select(x =>允许值是其立即运行新观察到的订阅,然后将其合并值回一个可观察的.

您可能更喜欢将其写为语义相同.Select(x => Observable.Start(() => ...)).Merge().

这是一个简单的例子,展示了它的工作原理:

var source = new Subject<int>();

source
    .SelectMany(x =>
        Observable.Start(() =>
        {
            Thread.Sleep(1000);
            return x * 2;
        }))
    .Subscribe(result =>
    {
        Console.WriteLine(result);
        source.OnNext(result);
        source.OnNext(result + 1);
    });

source.OnNext(1);
Run Code Online (Sandbox Code Playgroud)

它产生:

2
4
6
14
12
8
10
24
28
30
26
16
20
22
18
48
50
56
52
58
60
62
54
32
34
46
44
40
42