反应性扩展OnNext

use*_*006 12 multithreading system.reactive c#-4.0

使用RX Subject,OnNext()从多个线程调用是否可以线程安全?

因此序列可以从多个源生成.

合并做同样的事情?

Ast*_*sti 16

Rx合同要求通知是连续的,并且对于多个运营商来说是合乎逻辑的必要条件.也就是说,您可以使用可用的Synchronize方法来获取此行为.

        var subject = new Subject<int>();
        var syncedSubject = Subject.Synchronize(subject);            
Run Code Online (Sandbox Code Playgroud)

您现在可以进行并发呼叫syncedSubject.对于必须同步的观察者,您还可以使用:

        var observer = Observer.Create<Unit>(...);
        var syncedObserver = Observer.Synchronize(observer);
Run Code Online (Sandbox Code Playgroud)

测试:

        Func<int, Action> onNext = i => () => syncedSubject.OnNext(i);
        Parallel.Invoke
        (
            onNext(1),
            onNext(2),
            onNext(3),
            onNext(4)
        );
Run Code Online (Sandbox Code Playgroud)


Ana*_*tts 7

调用someSubject.OnNext()与线程安全一样someList.Add()- 您可以从 > 1 个线程调用它,但不能同时调用。把你的信息写OnNext在一份lock声明中,这样就安全了。


小智 5

不,序列意味着顺序,因此不允许重叠通知.您可以使用Synchronize扩展方法来强制执行正确的同步.像Merge这样的运算符会锁定以调用下游观察者,以确保对On*回调进行正确的串行调用.