Project Reactor:doOnNext(或其他 doOnXXX)异步

Вад*_*нюк 6 java asynchronous project-reactor reactive-streams

有没有像 doOnNext 这样的方法,但是是异步的?例如,我需要为确定的元素做一些长时间的日志记录(通过电子邮件发送通知)。

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            // For example, we need to do something time-consuming only for 3

            if (v.equals(3)) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("LOG FOR " + v);
        });

ints.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

但是为什么我要等待 3 的记录?我想异步执行此逻辑。

现在我只有这个解决方案

Thread.sleep(10000);

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Scheduler myParallel2 = Schedulers.newParallel("my-parallel2", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            Mono.just(v).publishOn(myParallel2).subscribe(value -> {
                if (value.equals(3)) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println("LOG FOR " + value);
            });
        });

ints.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

有什么“好的”解决方案吗?

Sim*_*slé 6

如果您绝对确定您不在乎电子邮件发送是否成功,那么您可以使用“subscribe-inside-doOnNext”,但我非常有信心这将是一个错误。

为了在“记录”失败时Flux传播onError信号,推荐的方法是使用flatMap.

好消息是,由于flatMap将来自内部发布者的结果立即合并到主序列中,您仍然可以立即发出每个元素并触发电子邮件。唯一需要注意的是,整个事情只能完成一次电子邮件的发送Mono也完成了。您还可以在flatMaplambda 中检查是否需要进行日志记录(而不是在内部Mono):

//assuming sendEmail returns a Mono<Void>, takes care of offsetting any blocking send onto another Scheduler

source //we assume elements are also publishOn as relevant in `source`
   .flatMap(v -> {
       //if we can decide right away wether or not to send email, better do it here
       if (shouldSendEmailFor(v)) {
           //we want to immediately re-emit the value, then trigger email and wait for it to complete
           return Mono.just(v)
               .concatWith(
                   //since Mono<Void> never emits onNext, it is ok to cast it to V
                   //which makes it compatible with concat, keeping the whole thing a Flux<V>
                   sendEmail(v).cast(V.class)
               );
       } else {
           return Mono.just(v);
       }
    });
Run Code Online (Sandbox Code Playgroud)