Java执行程序:如何在任务完成时通知而不阻塞?

Sha*_*baz 139 java callback notify executor

假设我有一个完整的任务队列,我需要提交给执行程序服务.我希望他们一次处理一个.我能想到的最简单的方法是:

  1. 从队列中获取任务
  2. 将其提交给遗嘱执行人
  3. 在返回的Future上调用.get并阻塞,直到结果可用
  4. 从队列中取出另一项任务......

但是,我试图完全避免阻塞.如果我有10,000个这样的队列,需要一次处理一个任务,我将耗尽堆栈空间,因为它们中的大多数将保持被阻塞的线程.

我想要的是提交一个任务并提供一个在任务完成时调用的回调.我将使用该回叫通知作为发送下一个任务的标志.(functionaljava和jetlang显然使用了这种非阻塞算法,但我无法理解他们的代码)

如何使用JDK的java.util.concurrent,而不是编写自己的执行器服务?

(向我提供这些任务的队列本身可以阻止,但这是一个需要解决的问题)

eri*_*son 135

定义回调接口以接收要在完成通知中传递的任何参数.然后在任务结束时调用它.

您甚至可以为Runnable任务编写一般包装器,并将其提交给ExecutorService.或者,请参阅下面的Java 8内置机制.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}
Run Code Online (Sandbox Code Playgroud)

有了CompletableFuture,Java 8包含了一个更精细的方法来组成流程,其中流程可以异步和有条件地完成.这是一个人为但完整的通知示例.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}
Run Code Online (Sandbox Code Playgroud)

  • 好的模式,我会使用[Guava的可听的未来API](http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained)提供非常好的实现. (4认同)
  • @erickson你能指定它是哪个“Callback”导入吗?那会有很大帮助。有很多,很难找到。 (2认同)
  • @Zelphir这是你声明的`Callback`接口; 不是来自图书馆.现在我可能只使用`Runnable`,`Consumer`或`BiConsumer`,这取决于我需要从任务传递回监听器. (2认同)

Mat*_*att 48

在Java 8中,您可以使用CompletableFuture.这是我在代码中的一个例子,我用它来从我的用户服务中获取用户,将它们映射到我的视图对象,然后更新我的视图或显示错误对话框(这是一个GUI应用程序):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );
Run Code Online (Sandbox Code Playgroud)

它以异步方式执行.我正在使用两种私有方法:mapUsersToUserViewsupdateView.


Pie*_*nri 47

使用Guava可听的未来API并添加回调.参看 来自网站:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
Run Code Online (Sandbox Code Playgroud)


Aug*_*ste 25

你可以扩展FutureTask类,并覆盖done()方法,然后将FutureTask对象添加到ExecutorService,所以done()方法将FutureTask立即完成时调用.


Cem*_*kas 14

ThreadPoolExecutor还有beforeExecuteafterExecute你可以覆盖和使用的钩子方法.这里是从描述ThreadPoolExecutorJavadoc中.

钩子方法

此类提供在每个任务执行之前和之后调用的受保护的可覆盖beforeExecute(java.lang.Thread, java.lang.Runnable)afterExecute(java.lang.Runnable, java.lang.Throwable)方法.这些可以用来操纵执行环境; 例如,重新初始化ThreadLocals,收集统计信息或添加日志条目.此外,terminated()可以重写方法以执行Executor完全终止后需要执行的任何特殊处理.如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止.


Yuv*_*dam 6

用一个CountDownLatch.

它来自java.util.concurrent并且它正是在继续之前等待多个线程完成执行的方式.

为了实现您正在寻找的回调效果,这需要一些额外的额外工作.也就是说,在一个单独的线程中自己处理它,并使用CountDownLatch它并等待它,然后继续通知你需要通知什么.回调没有本机支持,或类似于该效果的任何内容.


编辑:既然我进一步理解了你的问题,我认为你走得太远,不必要.如果你定期SingleThreadExecutor,给它所有的任务,它将本地排队.


bas*_*ero 5

如果要确保没有任务同时运行,请使用SingleThreadedExecutor。任务将按照提交顺序进行处理。您甚至不需要保留任务,只需将其提交给执行人员即可。