Rah*_*wal 7 java stream backpressure rx-java
我是反应式编程世界的新手,我正在尝试使用 rxjava 2 创建一个简单的背压感知消息处理。
以下是我试图实现的工作流程:
可流动的连续字符串流。
执行一个耗时的操作并将消息更改为另一个字符串
执行另一个耗时的操作。
现在我使用以下代码:
{
Flowable.create(subscriber -> {
some_stream.forEach(data -> {
subscriber.onNext(data);
});
}, BackpressureStrategy.BUFFER).
subscribeOn(Schedulers.io()). // Data emission will run io scheduler
observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
map(val -> Time_Consuming_Task(val)). // Task returns another string
observeOn(Schedulers.io()). / Next consumer will run on computation scheduler
subscribe(val -> Another_Time_Consuming_Task(val));
}
Run Code Online (Sandbox Code Playgroud)
现在对于小型操作,我没有看到任何与背压相关的问题。
但是对于大流,我不知道它会如何表现。
现在我的问题是:-
BackpressureStrategy.BUFFER的默认缓冲区大小是多少,数据在哪里缓冲?
如果我想在每次耗时任务之前创建两个背压缓冲区,我应该使用onBackpressureBuffer 操作符吗?
如果缓冲区已满,我不想丢失数据,我想等待或在这种情况下什么?
这比之前的 1024 有所降低(您可以在此处查看 RxJava 中正在实施的更改)。还有一个系统属性,您可以根据需要自行调整它:
System.setProperty("rx.ring-buffer.size", "8");
Run Code Online (Sandbox Code Playgroud)
因为它们被称为环形缓冲区,所以它们存储在内存中。您可以在此处阅读有关它们的更多信息。
循环缓冲区的结果是,当它已满并执行后续写入时,它会开始覆盖最旧的数据。
引用来自维基文章关于Circular buffer。
当您了解您的rx.ring-buffer.size.
onBackpressureBuffer(int capacity, // This is the given bound, not a setter for the ring buffer
Action0 onOverflow, // The desired action to execute
BackpressureOverflow.Strategy strategy) // The desired strategy to use
Run Code Online (Sandbox Code Playgroud)
再说一次,因为我说得再好不过了,让我引用 RxJava wiki on Backpressure (2.0):
BackpressureOverflow.Strategy 实际上是一个接口,但 BackpressureOverflow 类提供了 4 个静态字段,其实现代表了典型的操作:
ON_OVERFLOW_ERROR:这是前两个重载的默认行为,发出 BufferOverflowException 信号ON_OVERFLOW_DEFAULT: 目前与 ON_OVERFLOW_ERROR 相同ON_OVERFLOW_DROP_LATEST: 如果发生溢出,当前值将被简单地忽略,并且一旦下游请求将只传递旧值。ON_OVERFLOW_DROP_OLDEST:删除缓冲区中最旧的元素并将当前值添加到它。请注意,最后两种策略会在删除元素时导致流中的不连续性。此外,它们不会发出 BufferOverflowException 信号。
下面是一个例子:
Flowable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> { },
BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
值得注意的是:
ObservableRxJava 2.x 中的类型没有背压的概念。实现Observable实际上与onBackpressureBuffer()默认使用相同。UI 事件、一次性网络请求和状态更改都应该使用这种方法。的Completable,Maybe和Single类型也可以决定此行为。如果你需要支持背压,RxJava 2.x 的新类 ,
Flowable就像Observable在 RxJava 1.x 中一样支持背压。但是,更新后的库现在需要明确选择背压策略以防止出现意外MissingBackpressureExceptions。
阅读更多:
| 归档时间: |
|
| 查看次数: |
4882 次 |
| 最近记录: |