Concat VS Merge运算符

pau*_*aul 40 java java-8 rx-java

我正在检查RXJava的文档,我注意到concat和merge运算符似乎也是这样.我写了几个测试以确定.

@Test
public void testContact() {

    Observable.concat(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
              .subscribe(System.out::println);
}

@Test
public void testMerge() {

    Observable.merge(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
            .subscribe(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)

文件说

Merge运算符也类似.它结合了两个或多个Observable的发射,但可以交错它们,而Concat从不交错来自多个Observable的发射.

但是我还是不完全明白,运行这个测试千次,合并结果总是一样的.由于订单未被授予,我期待有时"反应性""世界""你好".

代码在这里https://github.com/politrons/reactive

Art*_*ski 126

它如您所引用的文档中所述 - merge可以交错输出,而concat将首先等待更早的流完成,然后再处理后续流.在你的情况下,对于单元素,静态流,它没有任何真正的区别(但理论上,合并可以按随机顺序输出单词,并且仍然根据规范有效).如果你想看到差异,请尝试以下(之后你需要增加一些睡眠以避免提前退出)

    Observable.merge(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

    Observable.concat(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

A0 A1 A2 A3 A4 A5 A6 A7 A8

Concat将永远不会开始打印B,因为流A永远不会完成.

s/stream/observable/g;)

文档提供了很好的图表来显示差异.你需要记住,merge不能保证逐个交错项,这只是一个可能的例子.

CONCAT

Concat运营商 合并

合并运算符

  • 一个快速的说明是,如果源是同步的然后`merge` =`concat` (27认同)
  • @ArturBiesiadowski 你介意在你的答案中添加“zip”吗? (2认同)

Ami*_*har 7

康卡特

Concat 发出来自两个或多个 Observable 的发射,而不将它们交错。它将在发出项目时保持可观察对象的顺序。这意味着它将发出第一个 observable 的所有项目,然后它会发出第二个 observable 的所有项目,依此类推。

连接运算符

让我们通过一个例子来清楚地理解它。

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.concat(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Run Code Online (Sandbox Code Playgroud)

当我们使用 Concat Operator 时,它将保持顺序并将值发送为 A1、A2、A3、A4、B1、B2、B3。

合并

Merge 通过合并它们的排放将多个 Observable 合并为一个。它不会在发出项目时保持顺序。

合并运算符

让我们通过一个例子来清楚地理解它。

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.merge(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Run Code Online (Sandbox Code Playgroud)

由于我们使用 Merge Operator,它不会保持顺序,并且可以按任何顺序发出值,例如A1、B1、A2、A3、B2、B3、A4A1、A2、B1、B2、A3、A4、B3或者可以是任何东西。

这就是我们应该根据我们的用例在 RxJava 中使用 Concat 和 Merge 运算符的方式。