将异步计算包装到同步(阻塞)计算中

Jas*_*n S 47 java concurrency asynchronous blocking

类似的问题:

我有一个对象,我希望向库客户端(特别是脚本客户端)公开一个方法,如:

interface MyNiceInterface
{
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
    public Future<Baz> doSomething(Foo fooArg, Bar barArg);
    // doSomethingAndBlock is the straightforward way;
    // doSomething has more control but deals with
    // a Future and that might be too much hassle for
    // scripting clients
}
Run Code Online (Sandbox Code Playgroud)

但我可用的原始"东西"是一组事件驱动的类:

interface BazComputationSink
{
    public void onBazResult(Baz result);
}

class ImplementingThing
{
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}
Run Code Online (Sandbox Code Playgroud)

其中,ImplementingThing接受输入,做一些神秘的事情,如在任务队列上排队,然后在结果发生时,sink.onBazResult()在一个线程上调用,该线程可能与调用ImplementingThing.doSomethingAsync()的线程相同或不同.

有没有一种方法可以使用我拥有的事件驱动函数以及并发原语来实现MyNiceInterface,以便脚本客户端可以愉快地等待阻塞线程?

编辑:我可以使用FutureTask吗?

Mic*_*ker 46

使用您自己的Future实现:

public class BazComputationFuture implements Future<Baz>, BazComputationSink {

    private volatile Baz result = null;
    private volatile boolean cancelled = false;
    private final CountDownLatch countDownLatch;

    public BazComputationFuture() {
        countDownLatch = new CountDownLatch(1);
    }

    @Override
    public boolean cancel(final boolean mayInterruptIfRunning) {
        if (isDone()) {
            return false;
        } else {
            countDownLatch.countDown();
            cancelled = true;
            return !isDone();
        }
    }

    @Override
    public Baz get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        return result;
    }

    @Override
    public Baz get(final long timeout, final TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        countDownLatch.await(timeout, unit);
        return result;
    }

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public boolean isDone() {
        return countDownLatch.getCount() == 0;
    }

    public void onBazResult(final Baz result) {
        this.result = result;
        countDownLatch.countDown();
    }

}

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    BazComputationFuture future = new BazComputationFuture();
    doSomethingAsync(fooArg, barArg, future);
    return future;
}

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    return doSomething(fooArg, barArg).get();
}
Run Code Online (Sandbox Code Playgroud)

该解决方案在内部创建一个CountDownLatch,一旦收到回调就会被清除.如果用户调用get,则CountDownLatch用于阻塞调用线程,直到计算完成并调用onBazResult回调.CountDownLatch将确保如果在调用get()之前发生回调,则get()方法将立即返回结果.


Pau*_*and 17

嗯,有一个简单的解决方案,做如下:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
  final AtomicReference<Baz> notifier = new AtomicReference();
  doSomethingAsync(fooArg, barArg, new BazComputationSink() {
    public void onBazResult(Baz result) {
      synchronized (notifier) {
        notifier.set(result);
        notifier.notify();
      }
    }
  });
  synchronized (notifier) {
    while (notifier.get() == null)
      notifier.wait();
  }
  return notifier.get();
}
Run Code Online (Sandbox Code Playgroud)

当然,这假设您的Baz结果永远不会为空...

  • 不建议这样做:如果你的回调立即返回,notify将在wait之前被调用,并且你会遇到死锁(如果你有缓存逻辑,这可能会发生) (2认同)

kyb*_*kos 12

google guava库有一个易于使用的SettableFuture,这个问题非常简单(大约10行代码).

public class ImplementingThing {

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    try {
        return doSomething(fooArg, barArg).get();
    } catch (Exception e) {
        throw new RuntimeException("Oh dear");
    }
};

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    final SettableFuture<Baz> future = new SettableFuture<Baz>();
    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
        @Override
        public void onBazResult(Baz result) {
            future.set(result);
        }
    });
    return future;
};

// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.

public static class Baz {}
public static class Foo {}
public static class Bar {}

public static interface BazComputationSink {
    public void onBazResult(Baz result);
}

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Baz baz = new Baz();
            sink.onBazResult(baz);
        }
    }).start();
};

public static void main(String[] args) {
    System.err.println("Starting Main");
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
    System.err.println("Ending Main");
}
Run Code Online (Sandbox Code Playgroud)


Ema*_*lin 6

这对于 RxJava 2.x 来说非常简单:

try {
    Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
            doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
            .toFuture().get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)

或者不使用 Lambda 符号:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
                @Override
                public void subscribe(SingleEmitter<Baz> emitter) {
                    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                        @Override
                        public void onBazResult(Baz result) {
                            emitter.onSuccess(result);
                        }
                    });
                }
            }).toFuture().get();
Run Code Online (Sandbox Code Playgroud)

更简单:

Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
                doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
                .blockingGet();
Run Code Online (Sandbox Code Playgroud)

科特林版本:

val baz = Single.create<Baz> { emitter -> 
    doSomethingAsync(fooArg, barArg) { result -> emitter.onSuccess(result) } 
}.blockingGet()
Run Code Online (Sandbox Code Playgroud)