在主线程中运行Flow

pau*_*aul 5 java java-9 java-flow

我再次将RxJava与Java 9 Flow进行比较.我看到Flow默认是异步的,我想知道是否有办法让它同步运行.

有时我们只是想将它用于Nio而不是糖语法,并且具有更加同质的代码.

在RxJava中,默认情况下是同步的,您可以使用observerOnsubscribeOn在管道中异步运行它.

Flow中是否有任何运算符使其在主线程中运行?

问候.

Nam*_*man 4

您可以按照使用同步执行Publisher中记录的方式定义自定义。Flow

一个非常简单的发布者,仅向单个订阅者发布(当请求时)单个 TRUE 项目。由于订阅者仅接收单个项目,因此此类不使用缓冲和排序控制。

class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // true after first subscribe
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // only one allowed
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (n != 0 && !completed) {
         completed = true;
         if (n < 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }
Run Code Online (Sandbox Code Playgroud)