主线程不等待订阅者完成反应式订阅者的任务

Man*_*mar 3 java spring multithreading project-reactor spring-webflux

我在春季有一项服务,需要使用十种不同的方法来获取数据。

我想让这些方法并行执行来执行一些数据库操作并返回到父线程。但父线程应该等待所有响应到来,然后返回响应。

在我当前的方法中,我使用反应式单声道异步执行所有方法,但主线程不等待订阅者方法完成。

以下是我订阅的两种方法

private Mono<BaseResponse> getProfileDetails(long profileId){
        return new Mono<BaseResponse>() {

            @Override
            public void subscribe(Subscriber<? super BaseResponse> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // DB Operation
                System.out.println("Inside getProfileDetails");
                s.onNext(new BaseResponse());
            }
        };
    }

private Mono<Address> getAddressDetails(long profileId){
        return new Mono<Address>() {

            @Override
            public void subscribe(Subscriber<? super Address> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // DB Operation
                System.out.println("Inside getAddressDetails");
                s.onNext(new Address());
            }
        };
    }
Run Code Online (Sandbox Code Playgroud)

下面是我的主要方法

public BaseResponse getDetails(long profileId){
        ExecutorService executors = Executors.newFixedThreadPool(2);

        Mono<BaseResponse> profileDetail = this.getProfileDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));
        Mono<BaseResponse> addressDetail = this.getAddressDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));

        List<BaseResponse> list = new ArrayList<>();

        profileDetail.mergeWith(addressDetail)
        .subscribe(consumer -> {
                list.add(consumer);
        });

        System.out.println("list: "+new Gson().toJson(list));
        executors.shutdown();

        return response;
    }
Run Code Online (Sandbox Code Playgroud)

下面是我的输出:

list: []
Inside getProfileDetails
Inside getAddressDetails
Run Code Online (Sandbox Code Playgroud)

我的输出显示主线程没有等待订阅者完成其任务,那么我该如何处理这种情况呢?

Mic*_*rry 6

我假设您的getProfileDetails()getAddressDetails()方法只是占位符,因为它们没有多大意义。

话虽这么说,如果这是您的整个应用程序,并且您真的只想在完成之前阻止,您不妨更改当前subscribe()对 a 的调用doOnNext(),然后只需blockLast()

profileDetail.mergeWith(addressDetail)
.doOnNext(consumer -> {
        list.add(consumer);
})
.blockLast();
Run Code Online (Sandbox Code Playgroud)

在反应式应用程序中阻塞反应式线程通常是不明智的,但在这种情况下,您实际上只想在彻底退出之前阻塞 - 所以我在这里看不到太多的缺点。