如何在Java Reactor中设置完全由背压驱动的流量?

Fed*_*lli 5 java reactive-programming project-reactor

我遇到需要多名工人的情况(假设2)。工人必须执行消耗上游事件的任务。

当前的任务消耗事件列表,并且具有与列表的大小无关的恒定时间。

因此,我希望上游在被请求时才发送包含所有缓冲事件的列表,一次包含一个列表。

可悲的是,大多数方法都实现了预取。发生的事情是,即使使用limitRate(1, 0)上游,也会 收到太多onRequest(1),只是为了补充下游缓冲区。

因此,我很难在工人可用时才生成缓冲列表:通常,它们是预先生成的,这错过了我最大化缓冲列表大小的目标。

如何实现这样的设置?

有没有一种方法可以完全禁用预取?

Raj*_*ani 1

不确定我是否正确理解了这个问题。显示您当前正在执行的操作的示例代码会有所帮助。

在之前不从源中提取数据的一种方法onRequestdefer实例化 Flux。所以你的代码看起来像这样:

Flux source = Flux.defer(() -> getFluxForUpstreamSource());
Run Code Online (Sandbox Code Playgroud)

使用另一种使用背压从源进行消耗的方法Flux.generate。你的代码看起来像这样:

Flux source = Flux.generate(
        UpstreamSource::getConnection,
        (connection, sink) -> {
            try {
                sink.next(connection.getNext());
            } catch (UpstreamException e) {
                sink.error(e);
            }
            return connection;
        }
);
Run Code Online (Sandbox Code Playgroud)