防止flux.bufferTimeout在超时后溢出

Hos*_*omi 5 java reactive-programming project-reactor

我对反应式编程和Reactor相对较新。我有一种情况,我想bufferTimeout在流中进行值的同时将其保持在我的控制之下(无限制请求),因此我可以手动请求一批值。

以下示例对此进行了说明:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

Flux<Object> flux = Flux.generate(sink -> {
    try {
        sink.next(queue.poll(10, TimeUnit.DAYS));
    }
    catch (InterruptedException e) {}
});

BaseSubscriber<List<Object>> subscriber = new BaseSubscriber<List<Object>>() {
    protected void hookOnSubscribe(Subscription subscription) {
        // Don't request unbounded
    }

    protected void hookOnNext(List<Object> value) {
        System.out.println(value);
    }
};

flux.subscribeOn(parallel())
        .log()
        .bufferTimeout(10, ofMillis(200))
        .subscribe(subscriber);

subscriber.request(1);

// Offer a partial batch of values
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);

// Wait for timeout, expect [1, 2, 3, 4, 5] to be printed
Thread.sleep(500); 

// Offer more values
queue.offer(6);
queue.offer(7);
queue.offer(8);
queue.offer(9);
queue.offer(10);
Thread.sleep(1000);
Run Code Online (Sandbox Code Playgroud)

这是输出:

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[ INFO] (main) request(10)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-1) onNext(4)
[ INFO] (parallel-1) onNext(5)
[1, 2, 3, 4, 5]
[ INFO] (parallel-1) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-1) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-1) onNext(10)
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
Run Code Online (Sandbox Code Playgroud)

我之所以如此,是因为我知道缓冲区订阅者将向上游请求10个值,这不知道超时,并且无论如何都会产生所有这些值。超时结束后,由于单人请求已完成,因此稍后提供的值仍会产生并溢出。

我想知道是否有可能防止在超时结束后产生剩余值,或者在不失去控制的情况下缓冲它们。我试过了:

  • limitRate(1)在之前bufferTimeout,尝试使缓冲区请求值“按需”。它确实要求一个接一个的请求,但是请求10次,因为缓冲区要求输入10个值。
  • onBackpressureBuffer(10),因为问题基本上是对压力的定义,如果我正确的话。尝试缓冲来自超时请求的溢出值,但这将请求无限制的值,我想避免这种情况。

看起来我将不得不实现另一种bufferTimeout实现,但是有人告诉我,编写发布者很困难。我想念什么吗?还是我在做出反应错误?

Hos*_*omi 2

通过实现我自己的订阅者解决了这个问题:

https://gist.github.com/hossomi/5edf60acb534a16c025e12e4e803d014

它仅请求所需数量的值,并在没有活动请求时缓冲接收到的值。缓冲区是无界的,因此可能需要谨慎使用或更改它。

很可能不像标准 Reactor 订阅者那么可靠,但对我来说很有效。欢迎提出建议!