RxJava:如何表达doOnFirst()?

TBi*_*iek 3 java reactive-programming rx-java rx-android rx-java2

我正在使用RxJava,Observable里面有多个项目.我想要做的是在第一项上运行函数A,在所有项上运行函数B,在Observable完成时运行函数C :

-----1-----2-----3-----|-->
     |     |     |     |
     run A |     |     |
     |     |     |     |
     run B run B run B |
                       |
                       run C
Run Code Online (Sandbox Code Playgroud)

是否有一种聪明的方式来表达lambda函数?我已经有了以下解决方案,但它看起来很难看,我怀疑有更好的方法可以做到这一点:

observable.subscribe(
        new Action1<Item>() {
            boolean first = true;

            @Override
            public void call(Item item) {
                if (first) {
                    runA(item);
                    first = false;
                }
                runB(fax1);
            }
        },
        throwable -> {},
        () -> runC());
Run Code Online (Sandbox Code Playgroud)

Dav*_*ten 8

使用Observable.defer每次订阅状态来封装(是一个布尔值,表示如果我们在第一个记录).

这是演示使用的可运行类:

import rx.Observable;
import rx.Observable.Transformer;
import rx.functions.Action1;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Observable<Integer> o = 
            Observable.just(1, 2, 3)
                .compose(doOnFirst(System.out::println);
        // will print 1
        o.subscribe();
        // will print 1
        o.subscribe();
    }

    public static <T> Transformer<T, T> doOnFirst(Action1<? super T> action) {
        return o -> Observable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return o.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    action.call(t);
                }
            });
        });
    }

}
Run Code Online (Sandbox Code Playgroud)

虽然OP询问RxJava1,但上面的解决方案与RxJava2相同:

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.functions.Consumer;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Flowable<Integer> f =
                Flowable.just(1, 2, 3)
                        .compose(doOnFirst(System.out::println);
        // will print 1
        f.subscribe();
        // will print 1
        f.subscribe();
    }

    public static <T> FlowableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
        return f -> Flowable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return f.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    consumer.accept(t);
                }
            });
        });
    }
}
Run Code Online (Sandbox Code Playgroud)