rxjava 2 中 BackpressureStrategy.BUFFER 和 onBackpressureBuffer 运算符之间的区别

Rah*_*wal 7 java stream backpressure rx-java

我是反应式编程世界的新手,我正在尝试使用 rxjava 2 创建一个简单的背压感知消息处理。

以下是我试图实现的工作流程:

  1. 可流动的连续字符串流。

  2. 执行一个耗时的操作并将消息更改为另一个字符串

  3. 执行另一个耗时的操作。

现在我使用以下代码:

{
    Flowable.create(subscriber -> {
             some_stream.forEach(data -> {
                subscriber.onNext(data);
            });
        }, BackpressureStrategy.BUFFER).
    subscribeOn(Schedulers.io()). // Data emission will run io scheduler
    observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
    map(val -> Time_Consuming_Task(val)). // Task returns another string
    observeOn(Schedulers.io()).  / Next consumer will run on computation scheduler
    subscribe(val -> Another_Time_Consuming_Task(val));
}
Run Code Online (Sandbox Code Playgroud)

现在对于小型操作,我没有看到任何与背压相关的问题。

但是对于大流,我不知道它会如何表现。

现在我的问题是:-

  1. BackpressureStrategy.BUFFER的默认缓冲区大小是多少,数据在哪里缓冲?

  2. 如果我想在每次耗时任务之前创建两个背压缓冲区,我应该使用onBackpressureBuffer 操作符吗?

  3. 如果缓冲区已满,我不想丢失数据,我想等待或在这种情况下什么?

ant*_*ori 5

回答您的问题:

1. 默认缓冲区大小因平台而异。在 JVM 上它是每个环形缓冲区 128 个项目,在 Android 上它是 16 个项目(来源

这比之前的 1024 有所降低(您可以在此处查看 RxJava 中正在实施的更改)。还有一个系统属性,您可以根据需要自行调整它:

System.setProperty("rx.ring-buffer.size", "8");
Run Code Online (Sandbox Code Playgroud)

因为它们被称为环形缓冲区,所以它们存储在内存中。您可以在此处阅读有关它们的更多信息。

2. & 3. 如果它变满了,它就会开始覆盖自己。在这种情况下,使用 onBackpressureBuffer

循环缓冲区的结果是,当它已满并执行后续写入时,它会开始覆盖最旧的数据。

引用来自维基文章关于Circular buffer

当您了解您的rx.ring-buffer.size.

onBackpressureBuffer(int capacity, // This is the given bound, not a setter for the ring buffer
    Action0 onOverflow, // The desired action to execute
    BackpressureOverflow.Strategy strategy) // The desired strategy to use
Run Code Online (Sandbox Code Playgroud)

再说一次,因为我说得再好不过了,让我引用 RxJava wiki on Backpressure (2.0)

BackpressureOverflow.Strategy 实际上是一个接口,但 BackpressureOverflow 类提供了 4 个静态字段,其实现代表了典型的操作:

  • ON_OVERFLOW_ERROR:这是前两个重载的默认行为,发出 BufferOverflowException 信号
  • ON_OVERFLOW_DEFAULT: 目前与 ON_OVERFLOW_ERROR 相同
  • ON_OVERFLOW_DROP_LATEST: 如果发生溢出,当前值将被简单地忽略,并且一旦下游请求将只传递旧值。
  • ON_OVERFLOW_DROP_OLDEST:删除缓冲区中最旧的元素并将当前值添加到它。

请注意,最后两种策略会在删除元素时导致流中的不连续性。此外,它们不会发出 BufferOverflowException 信号。

下面是一个例子:

Flowable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

值得注意的是:

ObservableRxJava 2.x 中的类型没有背压的概念。实现Observable实际上与onBackpressureBuffer()默认使用相同。UI 事件、一次性网络请求和状态更改都应该使用这种方法。的CompletableMaybeSingle类型也可以决定此行为。

如果你需要支持背压,RxJava 2.x 的新类 ,Flowable就像Observable在 RxJava 1.x 中一样支持背压。但是,更新后的库现在需要明确选择背压策略以防止出现意外MissingBackpressureExceptions

阅读更多: