我对反应式编程和Reactor相对较新。我有一种情况,我想bufferTimeout在流中进行值的同时将其保持在我的控制之下(无限制请求),因此我可以手动请求一批值。
以下示例对此进行了说明:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
Flux<Object> flux = Flux.generate(sink -> {
try {
sink.next(queue.poll(10, TimeUnit.DAYS));
}
catch (InterruptedException e) {}
});
BaseSubscriber<List<Object>> subscriber = new BaseSubscriber<List<Object>>() {
protected void hookOnSubscribe(Subscription subscription) {
// Don't request unbounded
}
protected void hookOnNext(List<Object> value) {
System.out.println(value);
}
};
flux.subscribeOn(parallel())
.log()
.bufferTimeout(10, ofMillis(200))
.subscribe(subscriber);
subscriber.request(1);
// Offer a partial batch of values
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);
// Wait for timeout, expect [1, 2, 3, 4, 5] to …Run Code Online (Sandbox Code Playgroud)