Fri*_*izz 12 java concurrency performance multithreading java.util.concurrent
我有一个工作单元流,我们称之为"工作项",按顺序处理(现在).我想通过多线程工作加快处理速度.
约束:这些工作项按特定顺序排列,在处理过程中订单不相关 - 但一旦处理完成,订单必须恢复.
像这样的东西:
|.|
|.|
|4|
|3|
|2| <- incoming queue
|1|
/ | \
2 1 3 <- worker threads
\ | /
|3|
|2| <- outgoing queue
|1|
Run Code Online (Sandbox Code Playgroud)
我想在Java中解决这个问题,最好不要使用Executor Services,Futures等,而是使用基本的并发方法,如wait(),notify()等.
原因是:我的工作项目非常小且细粒度,它们在每个约0.2毫秒内完成处理.所以我担心使用来自java.util.concurrent.*的东西可能会引入很多开销并减慢我的代码速度.
到目前为止我发现的例子都保留了处理过程中的顺序(这与我的情况无关),并且在处理后并不关心顺序(在我的情况下这是至关重要的).
小智 6
这就是我在上一个项目中解决您的问题的方式(但是 使用 java.util.concurrent)中:
(1)WorkItem类执行实际的工作/处理:
public class WorkItem implements Callable<WorkItem> {
Object content;
public WorkItem(Object content) {
super();
this.content = content;
}
public WorkItem call() throws Exception {
// getContent() + do your processing
return this;
}
}
Run Code Online (Sandbox Code Playgroud)
(2)此类将工作项放入队列中并启动处理:
public class Producer {
...
public Producer() {
super();
workerQueue = new ArrayBlockingQueue<Future<WorkItem>>(THREADS_TO_USE);
completionService = new ExecutorCompletionService<WorkItem>(Executors.newFixedThreadPool(THREADS_TO_USE));
workerThread = new Thread(new Worker(workerQueue));
workerThread.start();
}
public void send(Object o) throws Exception {
WorkItem workItem = new WorkItem(o);
Future<WorkItem> future = completionService.submit(workItem);
workerQueue.put(future);
}
}
Run Code Online (Sandbox Code Playgroud)
(3)处理完成后,工作项目在此处出队:
public class Worker implements Runnable {
private ArrayBlockingQueue<Future<WorkItem>> workerQueue = null;
public Worker(ArrayBlockingQueue<Future<WorkItem>> workerQueue) {
super();
this.workerQueue = workerQueue;
}
public void run() {
while (true) {
Future<WorkItem> fwi = workerQueue.take(); // deqeueue it
fwi.get(); // wait for it till it has finished processing
}
}
}
Run Code Online (Sandbox Code Playgroud)
(4)这是您使用代码中的内容并提交新工作的方式:
public class MainApp {
public static void main(String[] args) throws Exception {
Producer p = new Producer();
for (int i = 0; i < 10000; i++)
p.send(i);
}
}
Run Code Online (Sandbox Code Playgroud)
小智 0
您可以为每个 WorkItem 启动一个 DoTask 线程。该线程处理工作。工作完成后,您尝试发布项目,并在控制对象上同步,在其中检查它是否是正确的 ID,如果不正确则等待。
后期实施可以是这样的:
synchronized(controllingObject) {
try {
while(workItem.id != nextId) controllingObject.wait();
} catch (Exception e) {}
//Post the workItem
nextId++;
object.notifyAll();
}
Run Code Online (Sandbox Code Playgroud)