pau*_*aul 5 java java-9 java-flow
我再次将RxJava与Java 9 Flow进行比较.我看到Flow默认是异步的,我想知道是否有办法让它同步运行.
有时我们只是想将它用于Nio而不是糖语法,并且具有更加同质的代码.
在RxJava中,默认情况下是同步的,您可以使用observerOn和subscribeOn在管道中异步运行它.
Flow中是否有任何运算符使其在主线程中运行?
问候.
您可以按照使用同步执行Publisher中记录的方式定义自定义。Flow
一个非常简单的发布者,仅向单个订阅者发布(当请求时)单个 TRUE 项目。由于订阅者仅接收单个项目,因此此类不使用缓冲和排序控制。
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
completed = true;
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
131 次 |
| 最近记录: |