阻止RxJava映射内部无法完成

Geo*_*ell 2 deadlock blocking rx-java

以下(使用RxJava 1.2.4的错误方法)代码无法解除阻塞并且永远不会完成.

Scheduler scheduler = Schedulers.computation();
Observable.range(0, 100).map(i -> {
    System.out.println("onNext " + i);
    return Observable.just(i).subscribeOn(scheduler).toBlocking().single();
}).subscribeOn(scheduler).toBlocking().subscribe();
System.out.println("finished");
Run Code Online (Sandbox Code Playgroud)

如果将第一行更改为固定的线程池,则完成.

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(8));
Run Code Online (Sandbox Code Playgroud)

计算调度程序有什么特别之处,使第一个例子不起作用?

Yar*_*hiy 12

不要那样做

请注意有关计算调度程序的文档说明:

这可以用于事件循环,处理回调和其他计算工作.不要在此调度程序上执行IO绑定的工作.

他们想说:不要在此调度程序上执行任何阻止活动.

所以你所做的是非法的,但它可以作为一个很好的示范.

为什么发生死锁

在RxJava 2(此时为2.0.4)中发生相同的死锁.

它是由于computation调度程序的实现方式而发生的.它创建固定数量的单线程工作者(其数量是CPU核心数;在我的情况下为4).它为这些工作者分配任务的方式是简单的循环法.现在让我们看看在您的示例中将哪些任务分配给哪些工作人员.

  • worker 1 < - subscribe()call调用一个循环来生成整数range; 请注意,此任务未完成,直到所有值都传递到下游
  • worker 2 < - just(0)...toBlocking().single()表示第一个生成的整数; 这个值在没有实际阻塞的情况下立即完成,因为该值已经可用
  • worker 3 < - just(1)...toBlocking().single()表示第二个生成的整数; 这一个立即完成
  • worker 4 < - just(2)...toBlocking().single()表示第三个生成的整数; 这一个立即完成

此时我们工人1仍然忙于range循环,工人2-4闲置.接下来的任务从循环进来,它就会被分配到worker1根据循环赛:

  • worker 1 < - just(3)...toBlocking().single()表示第四个生成的整数; 这一个排队,而worker 1循环等待其结果.这是死锁.

FixedThreadPool调度程序不会锁定,因为它将任务分配给可用线程,而不是以循环方式.只要确保它有超过1个线程.

封锁是邪恶的

通常,您应该避免在Rx管道中阻止操作.Rx提供了很好的工具来执行异步任务而不会阻塞.而不是map你可以使用flatMap,例如:

Scheduler scheduler = Schedulers.computation();
Observable.range(0, 100).flatMap(i -> {
    System.out.println("onNext " + i);
    return Observable.just(i).subscribeOn(scheduler);
}).subscribeOn(scheduler).toBlocking().subscribe();
System.out.println("finished");
Run Code Online (Sandbox Code Playgroud)

即使使用单线程调度程序,这也可以工作.

而不是Observable.just(i)你可以像这样调用真正的异步任务Observable.fromFuture(asyncService(i)).

如果必要,请使用concatMap而不是flatMap保留项目的顺序.

  • 很棒的答案!它应该被接受,因为它完全回答了这个问题. (5认同)
  • @GeorgeCampbell我以为我在答案中详细描述了它. (4认同)
  • 我认为您需要考虑到防止任何可能的误用不是 RxJava 库的责任。我相信这将是一个永无止境的努力,试图实现这一目标。在这种情况下,很明显客户端代码的错误在于它在调度程序上使用了它不应该使用的阻塞代码。@YaroslavStavnichiy 完美地回答了这个问题。 (2认同)