可以从期货中观察 - 从多个线程开始

thu*_*und 5 scala observable rx-java rx-scala

我想Observable从列表的结果中实时生成Futures.

在最简单的情况下,假设我有一个我正在运行的期货清单Future.sequence,而我只是监控他们的进度,Observable每次完成时都会告诉我.我基本上是这样做的:

  def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
    Observable[String](observer => {
      val loudFutures: List[Future[Int]] = futs.map(f => {
            f onComplete {
              case Success(a) => observer.onNext(s"just did $a more")
              case Failure(e) => observer.onError(e)
            }
            f
        })
      Future.sequence(loudFutures) onComplete {
          case Success(_) => observer.onCompleted()
          case Failure(e) => observer.onError(e)
      }
    })
  }
Run Code Online (Sandbox Code Playgroud)

这在我的测试环境中工作正常.但我刚刚读过,onNext不应该从不同的线程调用,至少不要小心没有重叠的调用.建议的解决方法是什么?似乎许多现实世界的使用Observables需要onNext从这样的异步代码中调用,但我在文档中找不到类似的例子.

MyD*_*Tom 1

可观察合约

\n\n
\n

可观察对象必须串行(而不是并行)向观察者发出通知。它们可以从不同的线程发出这些通知,但通知之间必须存在正式的happens-before关系。

\n
\n\n

看看序列化

\n\n
\n

Observable 可以异步调用其观察者\xe2\x80\x99 方法\n,可能是从不同的线程调用。这可能会使这样的 Observable 违反 Observable 约定,因为它可能会尝试在其 OnNext 通知之一之前发送 OnCompleted 或 OnError 通知,或者可能同时从两个不同的线程发出 OnNext 通知。您可以通过对其应用 Serialize 运算符来强制此类 Observable 表现良好且同步。

\n
\n