RxJava Observable在第一次发射时得到通知

asc*_*sco 10 java reactive-programming rx-java

我有三个Observables,我和combineLastest合并:

    Observable<String> o1 = Observable.just("1");
    Observable<String> o2 = Observable.just("2");
    Observable<String> o3 = Observable.just("3");

    Observable.combineLatest(o1, o2, o3, new Func3<String, String, String, Object>() {
        @Override
        public Object call(String s, String s2, String s3) {
            return null;
        }
    });
Run Code Online (Sandbox Code Playgroud)

我想通知其中一个Observable的第一次发射而不忽略后面的发射,我想第一个操作员会这样做.是否有一个方便的操作员(例如):

    o1.doOnFirst(new Func1<String, Void>() {
        @Override
        public Void call(String s) {
            return null;
        }
    })
Run Code Online (Sandbox Code Playgroud)

Reu*_*ani 12

如果您正在处理流,我认为您可以doOnFirst通过简单实用take:

public static <T> Observable<T> withDoOnFirst(Observable<T> source, Action1<T> action) {
    return source.take(1).doOnNext(action).concatWith(source);
}
Run Code Online (Sandbox Code Playgroud)

这样,操作仅绑定到第一个项目.

这可以更改为处理被流添加skip以支持跳过已经采取的项的observables :

public static <T> Observable<T> withDoOnFirstNonStream(Observable<T> source, Action1<T> action) {
    return source.take(1).doOnNext(action).concatWith(source.skip(1));
}
Run Code Online (Sandbox Code Playgroud)

  • 要求对源进行两次订阅可能效率低下(我们不知道任何特定可观察量可能涉及哪些开销)。我建议采用一个将一个订阅用于一般用途的答案。对于热源,您也可能会错过排放。 (2认同)

Mal*_*alt 5

我可以想到几种解决方案。第一个是doOnNext的丑陋但简单的技巧。只需在Action1指示是否已收到第一项的字段中添加一个布尔字段即可。收到后,执行您想做的任何事情,然后翻转布尔值。例如:

Observable.just("1").doOnNext(new Action1<String>() {

        boolean first = true;

        @Override
        public void call(String t) {
            if (first) {
                // Do soemthing
                first = false;
            }
        }
    });
Run Code Online (Sandbox Code Playgroud)

第二个是使用publish或订阅两次要监视的可观察对象的订阅share(),其中一个发布正在进行中first(取决于您是否要手动连接到已发布的可观察对象)。您最终将得到两个单独的可观察项,它们发出相同的项,只有第一个可观察项将在第一个发射之后停止:

ConnectableObservable<String> o1 = Observable.just("1").publish();

o1.first().subscribe(System.out::println); //Subscirbed only to the first item
o1.subscribe(System.out::println); //Subscirbed to all items

o1.connect(); //Connect both subscribers
Run Code Online (Sandbox Code Playgroud)


Leo*_*der 5

为方便起见,我为Flowable和创建了这些扩展函数Observable
注意,with doOnFirst()action 将在第一个元素发射之前被调用,whiledoAfterFirst()将首先发射第一个元素,然后执行该动作。

fun <T> Observable<T>.doOnFirst(onFirstAction: (T) -> Unit): Observable<T> =
    take(1)
        .doOnNext { onFirstAction.invoke(it) }
        .concatWith(skip(1))

fun <T> Flowable<T>.doOnFirst(onFirstAction: (T) -> Unit): Flowable<T> =
    take(1)
        .doOnNext { onFirstAction.invoke(it) }
        .concatWith(skip(1))

fun <T> Observable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Observable<T> =
    take(1)
        .doAfterNext { afterFirstAction.invoke(it) }
        .concatWith(skip(1))

fun <T> Flowable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Flowable<T> =
    take(1)
        .doAfterNext { afterFirstAction.invoke(it) }
        .concatWith(skip(1))
Run Code Online (Sandbox Code Playgroud)

用法很简单:

Flowable.fromArray(1, 2, 3)
            .doOnFirst { System.err.println("First $it") }
            .subscribe { println(it) }
Run Code Online (Sandbox Code Playgroud)

输出:

// First 1  
// 1  
// 2  
// 3
Run Code Online (Sandbox Code Playgroud)

和:

Flowable.fromArray(1, 2, 3)
            .doAfterFirst { System.err.println("First $it") }
            .subscribe { println(it) }
Run Code Online (Sandbox Code Playgroud)

输出:

// 1  
// First 1
// 2  
// 3
Run Code Online (Sandbox Code Playgroud)