TBi*_*iek 3 java reactive-programming rx-java rx-android rx-java2
我正在使用RxJava,Observable里面有多个项目.我想要做的是在第一项上运行函数A,在所有项上运行函数B,在Observable完成时运行函数C :
-----1-----2-----3-----|-->
| | | |
run A | | |
| | | |
run B run B run B |
|
run C
Run Code Online (Sandbox Code Playgroud)
是否有一种聪明的方式来表达lambda函数?我已经有了以下解决方案,但它看起来很难看,我怀疑有更好的方法可以做到这一点:
observable.subscribe(
new Action1<Item>() {
boolean first = true;
@Override
public void call(Item item) {
if (first) {
runA(item);
first = false;
}
runB(fax1);
}
},
throwable -> {},
() -> runC());
Run Code Online (Sandbox Code Playgroud)
使用Observable.defer每次订阅状态来封装(是一个布尔值,表示如果我们在第一个记录).
这是演示使用的可运行类:
import rx.Observable;
import rx.Observable.Transformer;
import rx.functions.Action1;
public class DoOnFirstMain {
public static void main(String[] args) {
Observable<Integer> o =
Observable.just(1, 2, 3)
.compose(doOnFirst(System.out::println);
// will print 1
o.subscribe();
// will print 1
o.subscribe();
}
public static <T> Transformer<T, T> doOnFirst(Action1<? super T> action) {
return o -> Observable.defer(() -> {
final AtomicBoolean first = new AtomicBoolean(true);
return o.doOnNext(t -> {
if (first.compareAndSet(true, false)) {
action.call(t);
}
});
});
}
}
Run Code Online (Sandbox Code Playgroud)
虽然OP询问RxJava1,但上面的解决方案与RxJava2相同:
import java.util.concurrent.atomic.AtomicBoolean;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.functions.Consumer;
public class DoOnFirstMain {
public static void main(String[] args) {
Flowable<Integer> f =
Flowable.just(1, 2, 3)
.compose(doOnFirst(System.out::println);
// will print 1
f.subscribe();
// will print 1
f.subscribe();
}
public static <T> FlowableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
return f -> Flowable.defer(() -> {
final AtomicBoolean first = new AtomicBoolean(true);
return f.doOnNext(t -> {
if (first.compareAndSet(true, false)) {
consumer.accept(t);
}
});
});
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1021 次 |
| 最近记录: |