如何为一系列任务设计执行引擎

use*_*820 26 java concurrency multithreading

我试图在Java中编写一个问题,我必须执行一堆任务.

问题

执行由多个任务组成的作业,这些任务之间具有依赖关系.

一个作业将有一个任务列表,每个这样的任务将进一步有一个后续任务列表(每个后继任务将有自己的后续任务 - 你可以在这里看到递归性质).如果以下情况,每个后继任务都可以开始执行

  1. 它被配置为在其前任任务的部分执行时执行.在这种情况下,前任任务将通知它已部分完成,我的后续任务可以启动

  2. 成功完成其前任任务.

具有2个初始任务A和B的作业.A具有2个后继任务M和N.B具有1个后继任务P.P具有2个后继任务Y和Z.

M可以从其前任任务A的部分完成开始.Z可以在其前任任务P的部分完成时开始.N,P和Y可以仅在其前任任务A,B和P分别完成时开始.

任务层次结构(A和B可以并行启动)

我必须设计这样的工作流程/工作的执行.在设计中,我们必须确认先前任务发送的部分完成事件,以便可以启动其后继任务.我该怎么办呢?在并发中是否有适合此问题的设计模式?

Nim*_*007 7

看看akka - http://akka.io

使用akka你创建actor(事件驱动,并发实体,异步处理消息)

每个任务都可以表示为一个演员(你选择什么时候开始)

你可以在部分完成或完全完成时触发其他演员(任务)(实际上你可以随时触发它们)


Ste*_*stl 5

您的问题看起来像是Java的ForkJoin Framework的一个很好的用例.您可以将您的任务实现为RecursiveActions或RecursiveTasks(取决于您是否需要返回值),这将在您需要的任何条件下启动其子任务.您还可以控制子任务是按顺序还是并行运行.

例:

public class TaskA extends RecursiveAction {
  // ...

  protected void compute() {
    if (conditionForTaskM) {
      TaskM m = new TaskM();
      // Run task M asynchronously or use m.invoke() to run it synchronously.
      invokeAll(m);
    }

    // Run task N at the end of A
    invokeAll(new TaskN());
  }

}
Run Code Online (Sandbox Code Playgroud)

你需要一个ForkJoinPool实例来运行你的任务:

public static void main(String[] args) {
  ForkJoinPool pool = new ForkJoinPool();
  pool.submit(new TaskA());

  // Properly shutdown your pool...
}
Run Code Online (Sandbox Code Playgroud)

此示例在实现示例问题的一部分时非常简单.但一般来说,ForkJoin框架允许您创建树状结构的任务,其中每个父任务(例如A,B和P)允许您控制其直接子任务的执行.


Ank*_*pta 5

让我们从几个假设开始简化.

  1. 实际任务很少
  2. 它不是分布式环境
  3. 任务只有一个处理器
  4. 不是关键系统
  5. 性能不是必需的

:)

根据以上假设,我创建了https://gist.github.com/ankgupta/98148b8eead2fbbc2bbb

interface Task{
    public void doTask();
}

----EE.java

import java.util.*;

public class EE {
    private static EE ee = new EE();
    Map<String, Task> tasks = new HashMap<>();
    private EE(){ }

    public static EE getEE(){
        return ee;
    }

    public void register(String event, Task t){
        tasks.put(event, t);
    }

    public void message(String event){
        Task t = tasks.get(event);
        if(t!=null)
            t.doTask();
    }

}

class TaskA implements Task{
    public void doTask(){
        System.out.println("TaskA working!");
        EE.getEE().message("TASKA_PARTIAL");
        System.out.println("TaskA still working!");
        EE.getEE().message("TASKA_COMPLETE");
    }
}


class TaskB implements Task{
    public void doTask(){
        System.out.println("TaskB working!");
    }
}


class TaskC implements Task{
    public void doTask(){
        System.out.println("TaskC working!");
    }
}


public class Main{
    public static void main(String[] args){
        EE ee = EE.getEE();
        ee.register("STARTA", new TaskA());
        ee.register("TASKA_PARTIAL", new TaskB());
        ee.register("TASKA_COMPLETE", new TaskC());
        ee.message("STARTA");
    }
}
Run Code Online (Sandbox Code Playgroud)

要改进上面

  • 您应该有多个工作人员来添加并行的任务处理.
  • 对于错误处理,如果任务失败,相应的消息应该转到调用任务以便它处理它.
  • 您还应该使用真实的队列来处理事件.
  • 在多个处理器用于相同任务的情况下,使用任务分配算法.
  • 对于分布式环境,您必须处理命名,CAP ...


小智 3

如果我很好地理解您的需求,您可以使用像 Activity 这样的工作流引擎来解决它。我认为这比根据您的特定需求重新发明工作流引擎更容易。