使用SubscribeOn时,避免在Rx中重叠OnNext调用(Scheduler.TaskPool)

Wil*_*lka 12 c# system.reactive

我有一些使用Rx的代码,从多个线程调用,它执行:

subject.OnNext(value); // where subject is Subject<T>
Run Code Online (Sandbox Code Playgroud)

我希望在后台处理这些值,所以我的订阅是

subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value =>
{
    // use value
});
Run Code Online (Sandbox Code Playgroud)

我并不关心哪些线程处理来自Observable的值,只要将工作放入TaskPool并且不阻止当前线程.但是,我在OnNext委托中使用'value'并不是线程安全的.目前,如果很多值正在通过Observable,我正在调用我的OnNext处理程序.

我可以为我的OnNext委托添加一个锁,但这不像Rx的做事方式.当我有多个线程调用时,确保我一次只调用一次OnNext处理程序的最佳方法是什么subject.OnNext(value);

Wil*_*lka 13

使用 MSDN上的主题

默认情况下,主题不会跨线程执行任何同步.[...]但是,如果要使用调度程序将传出调用与观察者同步,则可以使用Synchronize方法执行此操作.

所以你应该像布兰登在评论中所说的那样同步主题并将其交给你的制作人线程.例如

var syncSubject = Subject.Synchronize(subject);

// syncSubject.OnNext(value) can be used from multiple threads

subscription = syncSubject.ObserveOn(TaskPoolScheduler.Default).Subscribe(value =>
{
    // use value
});
Run Code Online (Sandbox Code Playgroud)

  • 调用`Synchronize`应该在调用`ObserveOn`之前,否则你违反了`ObserveOn`的并发契约.但实际上,如果用例是在多个线程之间共享主题,最好的解决方案是同步主题而不是将订阅者同步到主题:`var syncSubject = Subject.Synchronize(syncSubject);`现在手`syncSubject` out到你的生产者线程,他们可以调用`syncSubject.OnNext()`而不会导致原始主题的订阅者出现问题. (2认同)

Lee*_*ell 6

我认为您正在寻找.Synchronize()扩展方法.为了在最近的版本(2011年末)中获得性能改进,他们Rx团队放宽了关于可观察序列生成器的顺序性质的假设.然而,似乎你打破了这些假设(并不是坏事),但是为了让Rx回归正如用户所期望的那样,你应该同步序列以确保它再次顺序.