Backpressure如何在RxJava内部发生

Qin*_*ing 2 java multithreading backpressure rx-java

我已经在RxJava上阅读了一些关于背压的文档,但我无法找到一个详细的解释,比如它是如何在图书馆内部发生的,每个人都只是总结它就像"生产者"太快而"消费者"太慢.

例如,如下面的代码:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);
Run Code Online (Sandbox Code Playgroud)

我一直在浏览RxJava源代码,所以我的理解是在主线程中我们将在每毫秒发出事件,一旦我们发出它,我们将值传递给System.out.println(i)方法并将其抛入newThead scheduler的线程池并在runnable中运行该方法.

所以我的问题是,异常是如何在内部发生的?因为当我们调用Thread.sleep()时,我们只是睡眠处理方法调用的线程 - > System.out.println()而不影响线程池中的其他线程,它怎么会导致异常.是因为线程池不再具有足够的可用线程了吗?

谢谢

aka*_*okd 6

您可以将背压视为允许一个操作员向其上游源发出许可证的系统:您可以给我128个元素.稍后这个运营商可能会说"好吧,给我另外一个96",所以总共有224个许可证未完成.一些来源,例如interval不关心许可证,只是定期分发价值.由于许可证的数量通常与队列或缓冲区中的可用容量密切相关,因此分发多于这些存储可以保持产量MissingBackpressureException.

检测到背压违规主要发生在offer有界队列返回false时,例如observeOn指示队列已满的情况.

检测违规的第二种方法是跟踪运营商中未完成的许可计数,例如onBackpressureDrop,当上游发送的数量超过此数量时,运营商根本不会转发它:

// in onBackpressureDrop
public void onNext(T value) {
    if (emitted != availablePermits) {
        emitted++;
        child.onNext(value);
    } else {
        // ignoring this value
    }
}
Run Code Online (Sandbox Code Playgroud)

子订户通过request()发出通知,通常会导致以下情况onBackpressureDrop:

public void childRequested(long n) {
    availablePermits += n;
}
Run Code Online (Sandbox Code Playgroud)

实际上,由于可能的异步执行,availablePermits是一种AtomicLong(并且被称为requested).