小编Hos*_*omi的帖子

防止flux.bufferTimeout在超时后溢出

我对反应式编程和Reactor相对较新。我有一种情况,我想bufferTimeout在流中进行值的同时将其保持在我的控制之下(无限制请求),因此我可以手动请求一批值。

以下示例对此进行了说明:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

Flux<Object> flux = Flux.generate(sink -> {
    try {
        sink.next(queue.poll(10, TimeUnit.DAYS));
    }
    catch (InterruptedException e) {}
});

BaseSubscriber<List<Object>> subscriber = new BaseSubscriber<List<Object>>() {
    protected void hookOnSubscribe(Subscription subscription) {
        // Don't request unbounded
    }

    protected void hookOnNext(List<Object> value) {
        System.out.println(value);
    }
};

flux.subscribeOn(parallel())
        .log()
        .bufferTimeout(10, ofMillis(200))
        .subscribe(subscriber);

subscriber.request(1);

// Offer a partial batch of values
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);

// Wait for timeout, expect [1, 2, 3, 4, 5] to …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor

5
推荐指数
1
解决办法
495
查看次数