如何让这个rxjava zip并行运行?

s-h*_*ter 8 java android reactive-programming rx-java rx-android

我有一个睡眠方法来模拟一个长时间运行的过程.

private void sleep() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我有一个方法返回一个Observable,其中包含参数中给出的2个字符串的列表.它在返回字符串之前调用sleep.

private Observable<List<String>> getStrings(final String str1, final String str2) {
    return Observable.fromCallable(new Callable<List<String>>() {
        @Override
        public List<String> call() {
            sleep();
            List<String> strings = new ArrayList<>();
            strings.add(str1);
            strings.add(str2);
            return strings;
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

然后我在Observalb.zip中调用了三次getStrings,我希望这三个调用并行运行,因此执行的总时间应该在2秒或3秒之内,因为睡眠只有2秒.但是,它总共花了秒钟.如何让它并行运行,以便在2秒内完成?

Observable
.zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<String>>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(List<String> strings) {
        //Display the strings
    }
});
Run Code Online (Sandbox Code Playgroud)

mergeStringLists方法

private Func3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() {
    return new Func3<List<String>, List<String>, List<String>, List<String>>() {
        @Override
        public List<String> call(List<String> strings, List<String> strings2, List<String> strings3) {
            Log.d(TAG, "...");

            for (String s : strings2) {
                strings.add(s);
            }

            for (String s : strings3) {
                strings.add(s);
            }

            return strings;
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

Bar*_*ski 15

这种情况正在发生,因为订阅你的zippedobservable发生在同一个io线程中.

为什么不试试这个:

Observable
    .zip(
        getStrings("One", "Two")
            .subscribeOn(Schedulers.newThread()),
        getStrings("Three", "Four")
            .subscribeOn(Schedulers.newThread()),
        getStrings("Five", "Six")
            .subscribeOn(Schedulers.newThread()),
        mergeStringLists())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<List<String>>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(List<String> strings) {
            //Display the strings
        }
    });
Run Code Online (Sandbox Code Playgroud)

如果有帮助,请告诉我

  • `Schedulers.io()`默认情况下是一个根据需要增长的线程池. (3认同)