从Spring webFlux java中订阅内部订阅flux

qui*_*tin 5 java flux spring-boot project-reactor spring-webflux

我使用 spring Reactor 库编写了一个逻辑,以异步模式获取所有操作员,然后获取每个操作员的所有设备(分页)。

创建一个 Flux 来获取所有运算符,然后订阅它。

    final Flux<List<OperatorDetails>> operatorDetailsFlux = reactiveResourceProvider.getOperators();
    operatorDetailsFlux
        .subscribe(operatorDetailsList -> {
          for (final OperatorDetails operatorDetails : operatorDetailsList) {
            getAndCacheDevicesForOperator(operatorDetails.getId());
          }
        });
Run Code Online (Sandbox Code Playgroud)

现在,对于每个运营商,我正在获取需要多个订阅才能获取设备 mono 的设备,该设备通过订阅 MONO 来异步获取所有页面。

private void getAndCacheDevicesForOperator(final int operatorId) {
    Mono<DeviceListResponseEntity> deviceListResponseEntityMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
        operatorId, 0);

    deviceListResponseEntityMono.subscribe(deviceListResponseEntity -> {
      final PaginatedResponseEntity PaginatedResponseEntity = deviceListResponseEntity.getData();
      final long totalDevicesInOperator = PaginatedResponseEntity.getTotalCount();


      int deviceCount = PaginatedResponseEntity.getCount();
      while (deviceCount < totalDevicesInOperator) {
        final Mono<DeviceListResponseEntity> deviceListResponseEntityPageMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
            operatorId, deviceCount);

        deviceListResponseEntityPageMono.subscribe(deviceListResponseEntityPage -> {
          final List<DeviceDetails> deviceDetailsList = deviceListResponseEntityPage.getData()
              .getItems();
          // work on devices
        });

        deviceCount += DEVICE_PAGE_SIZE;
      }
    });
  }
Run Code Online (Sandbox Code Playgroud)

这段代码工作正常。但我的问题是从订阅内部订阅单声道是个好主意吗?

qui*_*tin 3

我将其分为两个流程,首先获取所有运营商,然后获取每个运营商的所有设备。

对于分页,我使用Flux.expand提取所有页面。

public Flux<OperatorDetails> getAllOperators() {
  return getOperatorsMonoWithRetryAndErrorSpec(0)
      .expand(paginatedResponse -> {
        final PaginatedEntity operatorDetailsPage = paginatedResponse.getData();
        if (morePagesAvailable(operatorDetailsPage) {
          return getOperatorsMonoWithRetryAndErrorSpec(operatorDetailsPage.getOffset() + operatorDetailsPage.getCount());
        }
        return Mono.empty();
      })
      .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
      .subscribeOn(apiScheduler);
}

Run Code Online (Sandbox Code Playgroud)
public Flux<Device> getAllDevices(final int opId, final int offset) {
  return getConnectedDeviceMonoWithRetryAndErrorSpec(opId, offset)
      .expand(paginatedResponse -> {
        final PaginatedEntity deviceDetailsPage = paginatedResponse.getData();
        if (morePagesAvailabile(deviceDetailsPage)) {
          return getConnectedDeviceMonoWithRetryAndErrorSpec(opId,
              deviceDetailsPage.getOffset() + deviceDetailsPage.getCount());
        }
        return Mono.empty();
      })
      .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
      .subscribeOn(apiScheduler);
}

Run Code Online (Sandbox Code Playgroud)

最后,我创建一个管道并订阅它以触发该管道。

operatorDetailsFlux
    .flatMap(operatorDetails -> {
        return reactiveResourceProvider.getAllDevices(operatorDetails.getId(), 0);
    })
    .subscribe(deviceDetails -> {
      // act on devices
    });

Run Code Online (Sandbox Code Playgroud)