将Rx Observable拆分为多个流并单独处理

Bra*_*Bil 35 reactive-programming rxjs rx-java

这是我试图完成的图片.

--abca - BBB - 一个

分裂成

--a ----- a ------- a - >一个流

---- b ------ bbb --- - > b stream

------ c ---------- - > c stream

然后,能够

a.subscribe()
b.subscribe()
c.subscribe()
Run Code Online (Sandbox Code Playgroud)

到目前为止,我发现的所有内容都使用groupBy()拆分流,但随后将所有内容折叠回单个流并在同一个函数中处理它们.我想要做的是以不同的方式处理每个派生流.

我现在正在做的方式是做一堆过滤器.有一个更好的方法吗?

Tom*_*řák 37

像馅饼一样简单,只需使用 filter

scala中的一个例子

import rx.lang.scala.Observable

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ? x == "a")
val bSource: Observable[String] = hotO.filter(x ? x == "b")
val cSource: Observable[String] = hotO.filter(x ? x == "c")

aSource.subscribe(o ? println("A: " + o), println, () ? println("A Completed"))

bSource.subscribe(o ? println("B: " + o), println, () ? println("B Completed"))

cSource.subscribe(o ? println("C: " + o), println, () ? println("C Completed"))
Run Code Online (Sandbox Code Playgroud)

您只需要确保源可观察性很热.最简单的方法就是share它.

  • @double_squeeze只需使用`publish`而不是`share`,并在订阅所有订阅者时调用`connect`. (9认同)
  • 如果您想要或初始可观察到冷,该怎么办? (2认同)

ihu*_*huk 15

你没有崩溃ObservablesgroupBy.您可以改为订阅它们.

像这样的东西:

    String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};

    Action1<String> a = s -> System.out.print("-a-");

    Action1<String> b = s -> System.out.print("-b-");

    Action1<String> c = s -> System.out.print("-c-");

    Observable
            .from(inputs)
            .groupBy(s -> s)
            .subscribe((g) -> {
                if ("a".equals(g.getKey())) {
                    g.subscribe(a);
                }

                if ("b".equals(g.getKey())) {
                    g.subscribe(b);
                }

                if ("c".equals(g.getKey())) {
                    g.subscribe(c);
                }
            });
Run Code Online (Sandbox Code Playgroud)

如果语句看起来有点难看,但至少你可以分别处理每个流.也许有一种方法可以避免它们.

  • 我在调试时发现了一些有趣的事情(注意这是使用Rx JS),一旦"组"与处理程序相关联,该组中元素的另一个实例将不会通过上面显示的大订阅函数.相反,它会直接转发到该组订阅的Action.然而,通过过滤器,可以对源可观察的每个值进行比较.因此,对于N个分裂,将对每个元素进行可能的N比较.对于组,仅在找到新组时才进行比较. (4认同)
  • 布兰登比尔,我不得不重读你的问题,弄清楚你最初使用的是"过滤器".出于好奇,是什么让你更喜欢这个`groupBy`解决方案的`filter`一个?对我来说,"过滤器"似乎更简单,更容易理解,可能表现更好.你在`groupBy`中看到了什么优势? (2认同)