一旦观察者得到理想的结果,就可以安全/有效地处理活跃的Observable

Sha*_*awn 0 .net c# multithreading reactive-programming system.reactive

我已经订阅了一个推送内容的频率很高的Observable,这些内容来自网络I/O,所以每次推送都来自不同的线程,然后我有一些观察者可能会尝试获取一些内容然后快速取消订阅以确保有没有其他内容传入,所以代码示例如下:

        IDisposable dsp = null;
        dsp = TargetObservable.Subscribe((incomingContent) =>
        {
            if (incomingContent == "something")
            {
                myList.Add(incomingContent);
                dsp.Dispose();
            }
            else
            {
                otherList.Add(incomingContent);
            }
        });
Run Code Online (Sandbox Code Playgroud)

现在,OnNext显然不是线程安全的,意味着当Observer在调用Dispose()之前得到"东西"时,其他内容可能仍然传入并添加到'otherList',即使我放了一个'lock(.. .)'为整个'onNext(...)'.
这不是我们想要的,所以任何想法都要避免这种情况?我可以考虑的一种方法是修改Observable逐个推送内容(通过使用'lock'),然后性能必须伤害很多.谢谢.

Mar*_*age 5

要使用Rx,您需要遵循Rx指南.在你的情况下有4.2问题假设观察者实例以序列化方式调用,解决方案是使用Synchronize哪个基本上介绍lock你想要避免的.如果您lock在代码中无法承担声明,则需要在将网络事件发送到Rx之前编写自己的"廉价"同步.

使用同步序列,您可以OnNext使用Rx LINQ运算符来简化处理程序中的代码,例如TakeWhile:

var subscription = TargetObservable
  .Synchronize()
  .TakeWhile(incomingContent => incomingContent != "something"))
  .Subscribe( ... );
Run Code Online (Sandbox Code Playgroud)

或者您可以创建自己的运算符TakeWhileInclusive以包含谓词为false的最后一项:

static class ObservableExtensions {

  public static IObservable<TSource> TakeWhileInclusive<TSource>(
       this IObservable<TSource> source, 
       Func<TSource, Boolean> predicate) {
    return Observable
      .Create<TSource>(
        observer => source.Subscribe(
          item => {
            observer.OnNext(item);
            if (!predicate(item))
              observer.OnCompleted();
          },
          observer.OnError,
          observer.OnCompleted
        )
      );
  }
}
Run Code Online (Sandbox Code Playgroud)

  • @Shawn:你不需要在`OnNext`处理程序中处理订阅.相反,当触发`OnCompleted`时,订阅将与底层的`IObservable`断开连接.在我的例子中,当谓词变为假并且订阅将被断开时,`TakeWhile`将触发`OnCompleted`.如果要"从外部"关闭,例如在关机或类似活动期间,处置订阅非常有用. (2认同)
  • @Shawn:您可以使用`ObserveOn`在不同的线程上调度传入的事件.然后,"Synchronize"锁定仅在排队传入网络数据包所需的持续时间内进行.根据我的经验,Rx是非常有效的事件,虽然它引入了一些正确处理并发的开销.如果您认为此开销太大,则需要使用较低级别的原语来处理并发,并且错误的概率要高得多.最好避免过早优化,所以也许你应该在花费太多努力避免之前评估锁定的成本? (2认同)