如何在Java中实现类似DAG的调度程序?

Ji *_*ANG 4 java

我想在Java中实现一个简单的类似DAG的调度程序(无需结果),如下图所示:

类似DAG的调度

我可以简单地使用手动代码来实现这个目的:

ExecutorService executor = Executors.newCachedThreadPool();
Future<?> futureA = executor.submit(new Task("A"));
Future<?> futureC = executor.submit(new Task("C"));
futureA.get();
Future<?> futureB = executor.submit(new Task("B"));
futureB.get();
futureC.get();
Future<?> futureD = executor.submit(new Task("D"));
futureD.get();
Run Code Online (Sandbox Code Playgroud)

但我正在寻找一种更通用的方法,所以我可以像这样使用调度程序:

Container container = new Container();
container.addTask("A", new Task("A"));
container.addTask("B", new Task("B"), "A");
container.addTask("C", new Task("C"));
container.addTask("D", new Task("D"), "B", "C");
container.waitForCompletion();
Run Code Online (Sandbox Code Playgroud)

实际上我已经实现了一个简单的:

https://github.com/jizhang/micro-scheduler/blob/master/src/main/java/com/shzhangji/micro_scheduler/App.java

但我需要每隔100毫秒迭代一次所有任务,看看哪一个已准备好提交.同样在这个实现中,没有异常检查.

我也检查了Guava lib的ListenableFuture,但我不知道如何正确使用它.

任何关于如何实现DAG或推荐现有开源调度程序的建议都将受到赞赏.

Dev*_*sai 6

您正在寻找的是使用谷歌的guava库,它是可听的未来界面.ListenableFutures允许您拥有复杂的异步操作链.一旦使用allAsList方法完成任务B和C,就应该实现可监听的未来来执行任务D.

可听期货的文档:https: //code.google.com/p/guava-libraries/wiki/ListenableFutureExplained

关于可听期货的教程:http: //www.javacodegeeks.com/2013/02/listenablefuture-in-guava.html

使用allAsList,chain和transform方法的示例:http://codingjunkie.net/google-guava-futures/


cra*_*eem 5

Dexecutor(免责声明:我是所有者)是您要查找的库,下面是一个示例

public class WorkFlowManager {

private final Dexecutor<String, Boolean> dexecutor;

public WorkFlowManager(ExecutorService executorService) {
    this.dexecutor = buildDexecutor(executorService);

    buildGraph();
}

private Dexecutor<String, Boolean> buildDexecutor(final ExecutorService executorService) {
    DexecutorConfig<String, Boolean> config = new DexecutorConfig<>(executorService, new WorkFlowTaskProvider());
    return new DefaultDexecutor<>(config);
}

private void buildGraph() {
    this.dexecutor.addDependency(TaskOne.NAME, TaskTwo.NAME);
    this.dexecutor.addDependency(TaskTwo.NAME, TaskThree.NAME);
    this.dexecutor.addDependency(TaskTwo.NAME, TaskFour.NAME);
    this.dexecutor.addDependency(TaskTwo.NAME, TaskFive.NAME);
    this.dexecutor.addDependency(TaskFive.NAME, TaskSix.NAME);
    this.dexecutor.addAsDependentOnAllLeafNodes(TaskSeven.NAME);
}

public void execute() {
    this.dexecutor.execute(ExecutionConfig.TERMINATING);
}
}
Run Code Online (Sandbox Code Playgroud)

这将构建以下图形,并且将相应地执行。 执行器图

请参阅我如何?了解更多

为什么要执行人

  • 超轻量
  • 超快
  • 支持立即/计划重试逻辑
  • 支持非终止行为
  • 有条件地跳过任务执行
  • 良好的测试覆盖范围使您免受伤害
  • 在Maven Central中可用
  • 大量的文档
  • 支持分布式执行(Ignite,Hazelcast,Infinispan)

有用的链接