rxJava调度程序用例

bco*_*rso 245 java multithreading thread-safety rx-java rx-android

在RxJava中有5种不同的调度程序可供选择:

  1. immediate():创建并返回一个在当前线程上立即执行工作的Scheduler.

  2. trampoline():创建并返回一个调度程序,该调度程序对当前工作完成后要执行的当前线程进行排队.

  3. newThread():创建并返回一个Scheduler,为每个工作单元创建一个新的Thread.

  4. computation():创建并返回用于计算工作的Scheduler.这可以用于事件循环,处理回调和其他计算工作.不要在此调度程序上执行IO绑定的工作.使用调度程序.io()代替.

  5. io():创建并返回一个用于IO绑定工作的Scheduler.该实现由Executor线程池支持,该线程池将根据需要增长.这可用于异步执行阻塞IO.不要在此调度程序上执行计算工作.使用调度程序.计算()而不是.

问题:

前3个调度程序非常自我解释; 但是,我对计算io有点困惑.

  1. 究竟什么是"IO限制工作"?它用于处理streams(java.io)和files(java.nio.files)吗?它用于数据库查询吗?它是用于下载文件还是访问REST API?
  2. 如何计算()从不同newThread() ?是每次所有的calculate()调用都在单个(后台)线程而不是新的(后台)线程上吗?
  3. 为什么在进行IO工作时调用calculate()会很糟糕?
  4. 为什么在进行计算工作时调用io()会很糟糕?

Dav*_*ten 322

很棒的问题,我认为文档可以提供更多细节.

  1. io()由无限制的线程池支持,这是你用于非计算密集型任务的东西,这些东西不会对CPU造成太大负担.因此,与文件系统的交互,与不同主机上的数据库或服务的交互就是很好的例子.
  2. computation()由有限的线程池支持,其大小等于可用处理器的数量.如果你试图在可用处理器之外并行安排cpu密集型工作(比如使用newThread()),那么当线程争夺处理器时,你就会面临线程创建开销和上下文切换开销,并且它可能会受到很大的性能影响.
  3. 最好只留下computation()CPU密集型工作,否则你将无法获得良好的CPU利用率.
  4. io()由于2中讨论的原因要求进行计算工作是不好的io(),如果你io()并行安排了一千个计算任务,那么这千个任务中的每一个都将拥有自己的线程并争夺CPU产生的上下文切换成本.

  • 对于网络呼叫,使用Schedulers.io(),如果需要限制同时网络呼叫的数量,请使用Scheduler.from(Executors.newFixedThreadPool(n)). (30认同)
  • 通过熟悉RxJava源码.很长一段时间,这让我感到困惑,我认为文件需要在这方面加强. (5认同)
  • 你能举一个trampoline()有用的例子吗?我理解这个概念,但我无法弄清楚我在实践中使用它的场景.它是唯一一个对我来说仍然是个谜的调度程序 (4认同)
  • 你可能会认为默认情况下将`timeout`放在`calculate()`上你会阻塞一个线程,但事实并非如此.在`计算()`下使用`ScheduledExecutorService`,因此延迟时间不会阻止.鉴于这个事实`calculate()`是一个好主意,因为如果它在另一个线程上,那么我们将受到线程切换成本的影响. (4认同)
  • @IgorGanapolsky我猜这是你很少想做的事情.为每个工作单元创建一个新线程很少有利于提高效率,因为线程构造和拆卸都很昂贵.您通常希望重用计算()和其他调度程序所执行的线程.newThread()可能合法使用(至少我能想到)的唯一一次是启动孤立的,不常见的,长时间运行的任务.即便如此,我可能会使用io()来实现这种情况. (2认同)
  • 如果正在安排的工作也安排了更多的工作,那么使用immediate()你最终会得到一个StackOverflowError.trampoline()通过排队来避免这种情况. (2认同)

sof*_*ake 5

最重要的一点是Schedulers.ioSchedulers.computation都由无界线程池支持,而不是问题中提到的其他线程池。仅当Executor使用newCachedThreadPool(不受自动回收线程池限制)创建时, Schedulers.from(Executor)才共享此特征。

正如之前的回复和网络上的多篇文章中充分解释的那样,应谨慎使用Schedulers.ioSchedulers.computation,因为它们针对其名称中的工作类型进行了优化。但是,在我看来,它们最重要的作用是为反应流提供真正的并发性

与新手的看法相反,反应式流本质上不是并发的,而是本质上异步和顺序的。正是出于这个原因,只有当 I/O 操作阻塞时才应使用Schedulers.io(例如:使用诸如 Apache IOUtils FileUtils.readFileAsString(...)之类的阻塞命令),从而会冻结调用线程,直到操作被阻塞为止。完毕。

使用异步方法(例如 Java AsynchronousFileChannel(...))不会在操作期间阻塞调用线程,因此没有必要使用单独的线程。事实上,Schedulers.io线程并不真正适合异步操作,因为它们不运行事件循环,并且回调永远不会被调用。

相同的逻辑适用于数据库访问或远程 API 调用。如果您可以使用异步或反应式 API 进行调用,请不要使用Schedulers.io 。

回到并发。您可能无法访问异步或反应式 API 来异步或并发执行 I/O 操作,因此唯一的选择是在单独的线程上分派多个调用。唉,反应式流在其末端是顺序的,但好消息是flatMap ()运算符可以在其核心引入并发性

并发性必须在流构造中构建,通常使用flatMap()运算符。这个强大的运算符可以配置为在内部为flatMap()嵌入的 Function<T, R> 提供多线程上下文。该上下文由多线程调度程序(例如Scheduler.ioScheduler.computation )提供。

在有关 RxJava2调度程序并发的文章中查找更多详细信息,您可以在其中找到有关如何顺序和并发使用调度程序的代码示例和详细说明。

希望这可以帮助,

软杰克