RxJava线程安全

Zhe*_*lov 9 java multithreading thread-safety rx-java

这段代码是线程安全的吗?

Observable<String> observable = ... // some observable that calls
                                    // onNext from a background thread

observable
  .scan(new ArrayList<String>(), (List<String> acc, String next) -> {
    acc.add(next);
    return acc;
  })
  .subscribe( list -> {
    // do somethind with sequence of lists
    ...
  });
Run Code Online (Sandbox Code Playgroud)

我很好奇,因为ArrayList不是一个线程安全的数据结构.

Lee*_*ell 6

作为一个快速回答,在.NET(原始Rx实现)中,可以假定来自可观察序列的所有值都是顺序的.这并不排除它是多线程的.但是,如果以多线程方式生成值,则可能需要通过查找.NET Synchronize()Rx运算符的等效函数来强制执行顺序性 .

另一种选择是检查ScanRxJava源代码中的实现,以验证它是否强制执行您希望/期望为累加器函数提供安全性的顺序性.