在 rxJava 中压缩超过 9 个 Observables

Анд*_*рей 6 android observable kotlin rx-java reactivex

遇到需要做10-12个小的并行查询并合并结果的情况。但是如果有一个zip方法可以让你最多组合9个Observables,那怎么做更多我不明白。我尝试使用 zip 方法

public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)

但出现错误

java.lang.ClassCastException: java.util.ArrayList cannot be cast to io.reactivex.ObservableSource

尝试看起来像这样

List<Observable> list = new ArrayList<>();
list.add(orderRepository.getOne(54, "id"));
list.add(orderRepository.getTwo(54, "id"));
list.add(orderRepository.getThree(54, "id"));
list.add(orderRepository.getFour(54, "id"));
list.add(orderRepository.getFive());
list.add(orderRepository.getSix(54, "id"));
list.add(orderRepository.getSeven(54, "id"));
list.add(orderRepository.getEight());
list.add(orderRepository.getNine());
list.add(orderRepository.getTen(54, "id"));
list.add(orderRepository.getEleven(54, "id"));
Observable.fromIterable(list);

return Observable.zip(list,
        new Function<Object[], OrderModel>() {
            @Override
            public OrderModel apply(Object[] objects) throws Exception {
                Logger.trace("objects = ", objects);
                return new OrderModel();
            }
        });
Run Code Online (Sandbox Code Playgroud)

请举例说明如何在 Java 或 Kotlin 中执行此操作。

小智 7

    Observable one = Observable.zip(
        orderRepository.getOne(54, "id"), 
        orderRepository.getTwo(54, "id"),
        // Etc up to five (I think)
        Function zipper
    );

    Observable two = Observable.zip(
        one, orderRepository.getSix(54, "id"),
        orderRepository.getSeven(54, "id"),
        // Etc up to five,
        Function zipper
    );
Run Code Online (Sandbox Code Playgroud)

冲洗并重复,直到将所有可观察对象压缩在一起。


Han*_*rst 5

以下是如何将 Observable#zip 重载与 Iterable 一起使用的示例。你会得到一个 new Object[] 的结果,因为 Java 泛型不支持 new T[]。

在此示例中,您会看到 T 的 Observables 列表,它将在 Observable#zip 中使用。拉链函数会将每个对象转换为字符串并返回一个 T 列表。结果是 Observable> 。

另外,我建议您确保给定的可观察列表不为空。当一个空的 Observables 列表被赋予 Observable#zip 时,它将立即完成而不发出任何内容(感谢@akarnokd)。或者您可以使用 Observable#switchIfEmpty 来提供后备值(例如空列表)

@Test
  void x() {
    Observable<String> z1 = Observable.just("1");
    Observable<String> z2 = Observable.just("2");
    Observable<String> z3 = Observable.just("3");

    List<Observable<String>> observables = Arrays.asList(z1, z2, z3);

    Observable<List<String>> zip =
        Observable.zip(
            observables,
            objects -> {
              List<String> resultList =
                  Stream.of(objects).map(o -> (String) o).collect(Collectors.toList());

              return resultList;
            });

    zip.test()
        .assertNoErrors()
        .assertComplete()
        .assertValueCount(1)
        .assertValueAt(
            0,
            r -> {
              assertThat(r).contains("1", "2", "3");

              return true;
            });
  }
Run Code Online (Sandbox Code Playgroud)

  • 但是如果所有可观察量都返回不同类型的数据呢?等等 12. 遇到这种情况怎么办?这12个请求是来自不同类型的服务器的数据,是在应用程序页面上显示所必需的。我不想做类似于UI中查询顺序的事情,首先请求这个数据包,然后请求剩余的。我怀疑有人一定遇到过所有 12 种观察到的不同类型的情况,并且有必要将它们收集到一个返回的模型中 (2认同)