rxjava2-在线程池上执行任务的简单示例,订阅一个线程

bco*_*lan 3 java kotlin rx-java2

我正在尝试执行以下任务来使我熟悉RxJava:

  • 给定URL列表
  • 对线程池中的每个URL进行HTTP请求
  • 对于每个结果,将一些数据插入SQLite数据库(此处没有多线程)
  • 阻塞方法直到完成

所以我在Kotlin尝试了一下:

val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
    .observeOn(Schedulers.from(ex))
    .map { Thread.currentThread().name }
    .subscribe { println(it + " " + Thread.currentThread().name }
Run Code Online (Sandbox Code Playgroud)

我希望它能打印

pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....
Run Code Online (Sandbox Code Playgroud)

但是它打印:

pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
Run Code Online (Sandbox Code Playgroud)

谁能纠正我对此工作的误解?为什么不使用线程池中的所有线程?如何使订户在主线程或块上运行,直到完成?

nha*_*man 5

Rx并不意味着是并行执行服务,为此请使用Java的流api。Rx事件是同步的,并将随后流经流。构建流时,observeOn将请求一个线程,并在该线程上一个接一个地处理排放。

您还应该subscribe在主线程上执行。observeOn切换线程,所有下游事件都在该线程上发生。如果要切换到主线程,则必须在observeOn之前插入另一个subscribe