Project Reactor 的 flatMap 中线程的混乱

dev*_*123 5 java spring spring-boot project-reactor spring-webflux

我正在研究 Project Reactor 和反应式 MongoDB 存储库。我有以下代码:

@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
    @Id
    Integer id;
    String name;
}
Run Code Online (Sandbox Code Playgroud)
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}
Run Code Online (Sandbox Code Playgroud)

和主@SpringBootApplication类:

@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {

    private final ReactivePersonRepository reactivePersonRepository;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveDatabaseApplication.class, args);
    }

    @PostConstruct
    public void postConstruct() {
        Scheduler single = Schedulers.newSingle("single-scheduler");
        IntStream.range(0, 10).forEach(i ->
                Flux.just(Person.builder()
                        .id(i)
                        .name("PersonName")
                        .build())
                        .flatMap(personToSave -> {
                            System.out.println(String.format(
                                    "Saving person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.save(personToSave);
                        })
                        //.publishOn(single)
                        .flatMap(savedPerson -> {
                            System.out.println(String.format(
                                    "Finding person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.findById(savedPerson.getId());
                        })
                        //.publishOn(single)
                        .flatMap(foundPerson -> {
                            System.out.println(String.format(
                                    "Deleting person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.deleteById(foundPerson.getId());
                        })
                        //.publishOn(single)
                        .subscribeOn(single)
                        .subscribe(aVoid -> System.out.println(String.format(
                                "Subscription from thread %s", Thread.currentThread().getName()))));
    }
}
Run Code Online (Sandbox Code Playgroud)

方法Flux::subscribeOn描述说:

因此,将此运算符放置在链中的任何位置也会影响从链的开头到下一次出现的 onNext/onError/onComplete 信号的执行上下文。

这让我有点困惑,因为当我publishOn在处理链中没有指定任何内容时,线程名称的打印值是:

从线程 single-scheduler-1 中拯救人员 - 正如预期的那样

从线程中查找人员 Thread-13

从线程 Thread-6 中查找人员

从线程 Thread-15 中查找人员

从线程 Thread-6 中删除人员

从线程 Thread-5 中删除人员

从线程 Thread-4 中删除人员

我不明白为什么。方法中指定的调度程序不应该subscribeOn用于每次flatMap执行吗?

当我取消注释publishOn行时,所有内容都由给定的单个调度程序执行,这是预期的。

谁能解释为什么操作不使用单个调度程序flatMap,而没有publishOn

Mic*_*rry 5

这个人为的例子可能会更清楚:

Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
        .flatMap(x -> {
            System.out.println(String.format(
                    "Saving person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Finding person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Deleting person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .subscribeOn(single)
        .subscribe(aVoid -> System.out.println(String.format(
        "Subscription from thread %s", Thread.currentThread().getName())));
Run Code Online (Sandbox Code Playgroud)

这将给出类似的内容:

Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4
Run Code Online (Sandbox Code Playgroud)

或者,换句话说,您的反应存储库没有在同一个调度程序上发布,就是您看到您所做的行为的原因。“直到下一次出现publishOn()”并不意味着您的代码下次调用publishOn()- 它也可以在您的任何flatMap()调用中的任何发布者中,您将无法控制。