多线程执行,保留已完成工作项的顺序

Fri*_*izz 12 java concurrency performance multithreading java.util.concurrent

我有一个工作单元流,我们称之为"工作项",按顺序处理(现在).我想通过多线程工作加快处理速度.

约束:这些工作项按特定顺序排列,在处理过程中订单不相关 - 但一旦处理完成,订单必须恢复.

像这样的东西:

   |.|
   |.|
   |4|
   |3|
   |2|    <- incoming queue
   |1|
  / | \
 2  1  3  <- worker threads
  \ | /
   |3|
   |2|    <- outgoing queue
   |1|
Run Code Online (Sandbox Code Playgroud)

我想在Java中解决这个问题,最好不要使用Executor Services,Futures等,而是使用基本的并发方法,如wait(),notify()等.

原因是:我的工作项目非常小且细粒度,它们在每个约0.2毫秒内完成处理.所以我担心使用来自java.util.concurrent.*的东西可能会引入很多开销并减慢我的代码速度.

到目前为止我发现的例子都保留了处理过程中的顺序(这与我的情况无关),并且在处理后并不关心顺序(在我的情况下这是至关重要的).

小智 6

这就是我在上一个项目中解决您的问题的方式(但是 使用 java.util.concurrent)中:

(1)WorkItem类执行实际的工作/处理:

public class WorkItem implements Callable<WorkItem> {
    Object content;
    public WorkItem(Object content) {
        super();
        this.content = content;
    }

    public WorkItem call() throws Exception {
        // getContent() + do your processing
        return this;
    }
}
Run Code Online (Sandbox Code Playgroud)

(2)此类将工作项放入队列中并启动处理:

public class Producer {
    ...
    public Producer() {
        super();
        workerQueue = new ArrayBlockingQueue<Future<WorkItem>>(THREADS_TO_USE);
        completionService = new ExecutorCompletionService<WorkItem>(Executors.newFixedThreadPool(THREADS_TO_USE));
        workerThread = new Thread(new Worker(workerQueue));
        workerThread.start();
    }

    public void send(Object o) throws Exception {
        WorkItem workItem = new WorkItem(o);
        Future<WorkItem> future = completionService.submit(workItem);
        workerQueue.put(future);
    }
}
Run Code Online (Sandbox Code Playgroud)

(3)处理完成后,工作项目在此处出队:

public class Worker implements Runnable {
    private ArrayBlockingQueue<Future<WorkItem>> workerQueue = null;

    public Worker(ArrayBlockingQueue<Future<WorkItem>> workerQueue) {
        super();
        this.workerQueue = workerQueue;
    }

    public void run() {
        while (true) {
            Future<WorkItem> fwi = workerQueue.take(); // deqeueue it
            fwi.get(); // wait for it till it has finished processing
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

(4)这是您使用代码中的内容并提交新工作的方式:

public class MainApp {
    public static void main(String[] args) throws Exception {
        Producer p = new Producer();
        for (int i = 0; i < 10000; i++)
            p.send(i);
    }
}
Run Code Online (Sandbox Code Playgroud)


小智 0

您可以为每个 WorkItem 启动一个 DoTask 线程。该线程处理工作。工作完成后,您尝试发布项目,并在控制对象上同步,在其中检查它是否是正确的 ID,如果不正确则等待。

后期实施可以是这样的:

synchronized(controllingObject) {
try {
while(workItem.id != nextId) controllingObject.wait();
} catch (Exception e) {}
//Post the workItem
nextId++;
object.notifyAll();
}
Run Code Online (Sandbox Code Playgroud)