saa*_*arp 3 java multithreading
最近出现了一个用例,其中我必须同时启动几个阻塞的IO任务并按顺序使用它们。我不想更改使用方的操作顺序,并且由于这是一个Web应用程序,并且它们是请求路径中的短暂任务,所以我不想在固定线程池上遇到瓶颈,并希望镜像该线程。 .Net异步/等待编码风格。这样做FutureTask<>似乎很理想,但是需要一个ExecutorService。这是试图消除对一个的需要。
操作顺序:
完事
...
我想为每个线程生成一个新线程,FutureTask<>但是简化了线程管理。后run()完成,调用线程可以加入。
我想出的解决方案是:
包com.staples.search.util;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class FutureWrapper<T> extends FutureTask<T> implements Future<T> {
private Thread myThread;
public FutureWrapper(Callable<T> callable) {
super(callable);
myThread = new Thread(this);
myThread.start();
}
@Override
public T get() {
T val = null;
try {
val = super.get();
myThread.join();
}
catch (Exception ex)
{
this.setException(ex);
}
return val;
}
}
Run Code Online (Sandbox Code Playgroud)
这里有一对夫妇的JUnit测试我创建比较FutureWrapper来CachedThreadPool。
@Test
public void testFutureWrapper() throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
int numThreads = 2000;
List<FutureWrapper<ValueHolder>> taskList = new ArrayList<FutureWrapper<ValueHolder>>();
System.out.printf("FutureWrapper: Creating %d tasks\n", numThreads);
for (int i = 0; i < numThreads; i++) {
taskList.add(new FutureWrapper<ValueHolder>(new Callable<ValueHolder>() {
public ValueHolder call() throws InterruptedException {
int value = 500;
Thread.sleep(value);
return new ValueHolder(value);
}
}));
}
for (int i = 0; i < numThreads; i++)
{
FutureWrapper<ValueHolder> wrapper = taskList.get(i);
ValueHolder v = wrapper.get();
}
System.out.printf("Test took %d ms\n", System.currentTimeMillis() - startTime);
Assert.assertTrue(true);
}
@Test
public void testCachedThreadPool() throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
int numThreads = 2000;
List<Future<ValueHolder>> taskList = new ArrayList<Future<ValueHolder>>();
ExecutorService esvc = Executors.newCachedThreadPool();
System.out.printf("CachedThreadPool: Creating %d tasks\n", numThreads);
for (int i = 0; i < numThreads; i++) {
taskList.add(esvc.submit(new Callable<ValueHolder>() {
public ValueHolder call() throws InterruptedException {
int value = 500;
Thread.sleep(value);
return new ValueHolder(value);
}
}));
}
for (int i = 0; i < numThreads; i++)
{
Future<ValueHolder> wrapper = taskList.get(i);
ValueHolder v = wrapper.get();
}
System.out.printf("Test took %d ms\n", System.currentTimeMillis() - startTime);
Assert.assertTrue(true);
}
class ValueHolder {
private int value;
public ValueHolder(int val) { value = val; }
public int getValue() { return value; }
public void setValue(int val) { value = val; }
}
Run Code Online (Sandbox Code Playgroud)
重复运行使FutureWrapperCachedThreadPool的时间为〜925ms,而〜935ms。两项测试都达到了OS线程限制。
一切似乎正常,线程产生速度非常快(10k个线程,约4秒内有随机睡眠)。有人看到这个实现有问题吗?
创建和启动数千个线程通常是一个非常糟糕的主意,因为创建线程很昂贵,并且线程数多于处理器将不会带来性能提升,但会导致线程上下文切换消耗CPU周期。(请参阅下面的注释)
因此,我认为您的测试代码在推理中包含一个大错误:您正在通过调用来模拟CPU负载Thread.sleep(500)。但是实际上,这实际上并不会导致CPU做任何事情。可能有许多并行的睡眠线程-不管您有多少个处理器,但运行的CPU消耗任务不可能比并行(实际)的处理器运行更多。
如果您模拟实际的CPU负载,您会发现,更多线程只会由于线程管理而增加开销,而不会减少总处理时间。
因此,让我们比较一下并行运行CPU消耗任务的不同方式!
首先,假设我们有一些消耗CPU的任务,这些任务总是花费相同的时间:
public Integer task() throws Exception {
// do some computations here (e.g. fibonacchi, primes, cipher, ...)
return 1;
}
Run Code Online (Sandbox Code Playgroud)
我们的目标是NUM_TASKS使用不同的执行策略来运行此任务时间。对于我们的测试,我们设置了NUM_TASKS = 2000。
(1)使用每任务线程策略
此策略与您的方法非常相似,不同之处在于,不必FutureTask继承线程并进行分类。相反,您可以FutureTask直接使用a Runnable和a Future:
@Test
public void testFutureTask() throws InterruptedException, ExecutionException {
List<RunnableFuture<Integer>> taskList = new ArrayList<RunnableFuture<Integer>>();
// run NUM_TASKS FutureTasks in NUM_TASKS threads
for (int i = 0; i < NUM_TASKS; i++) {
RunnableFuture<Integer> rf = new FutureTask<Integer>(this::task);
taskList.add(rf);
new Thread(rf).start();
}
// now wait for all tasks
int sum = 0;
for (Future<Integer> future : taskList) {
sum += future.get();
}
Assert.assertEquals(NUM_TASKS, sum);
}
Run Code Online (Sandbox Code Playgroud)
使用JUnitBenchmarks(10个测试迭代+ 5个热身迭代)运行此测试会产生以下结果:
ThreadPerformanceTest.testFutureTask: [measured 10 out of 15 rounds, threads: 1 (sequential)]
round: 0.66 [+- 0.01], round.block: 0.00 [+-
0.00], round.gc: 0.00 [+- 0.00], GC.calls: 66, GC.time: 0.06, time.total: 10.59, time.warmup: 4.02, time.bench: 6.57
Run Code Online (Sandbox Code Playgroud)
因此,一轮(方法的执行时间task())约为0.66秒。
(2)使用每CPU线程数策略
该策略使用固定数量的线程来执行所有任务。因此,我们创建一个ExecutorServicevia Executors.newFixedThreadPool(...)。线程数应等于CPU数(Runtime.getRuntime().availableProcessors()),在我的情况下为8。
为了能够跟踪结果,我们只需使用即可CompletionService。它会自动处理结果-无论结果按什么顺序到达。
@Test
public void testFixedThreadPool() throws InterruptedException, ExecutionException {
ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<Integer> ecs = new ExecutorCompletionService<Integer>(exec);
// submit NUM_TASKS tasks
for (int i = 0; i < NUM_TASKS; i++) {
ecs.submit(this::task);
}
// now wait for all tasks
int sum = 0;
for (int i = 0; i < NUM_TASKS; i++) {
sum += ecs.take().get();
}
Assert.assertEquals(NUM_TASKS, sum);
}
Run Code Online (Sandbox Code Playgroud)
同样,我们使用具有相同设置的JUnitBenchmarks运行此测试。结果是:
ThreadPerformanceTest.testFixedThreadPool: [measured 10 out of 15 rounds, threads: 1 (sequential)]
round: 0.41 [+- 0.01], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 22, GC.time: 0.04, time.total: 6.59, time.warmup: 2.53, time.bench: 4.05
Run Code Online (Sandbox Code Playgroud)
现在,一轮只有0.41秒(几乎减少了40%的运行时间)!同样不是更少的GC调用。
(3)顺序执行
为了进行比较,我们还应该测量非并行执行:
@Test
public void testSequential() throws Exception {
int sum = 0;
for (int i = 0; i < NUM_TASKS; i++) {
sum += this.task();
}
Assert.assertEquals(NUM_TASKS, sum);
}
Run Code Online (Sandbox Code Playgroud)
结果:
ThreadPerformanceTest.testSequential: [measured 10 out of 15 rounds, threads: 1 (sequential)]
round: 1.50 [+- 0.01], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+-0.00], GC.calls: 244, GC.time: 0.15, time.total: 22.81, time.warmup: 7.77, time.bench: 15.04
Run Code Online (Sandbox Code Playgroud)
请注意,1.5秒是2000次执行,因此单次执行task()需要0.75毫秒。
解释
根据阿姆达尔定律,在n个处理器上执行算法的时间T(n)为:
B是算法中无法并行化且必须按顺序运行的部分。对于纯顺序算法,B为1;对于纯并行算法,B为0(但这是不可能的,因为始终会有一些顺序开销)。
T(1)可从我们的顺序执行中得出:T(1)= 1.5 s
如果没有开销(B = 0),则在8个CPU上,我们得到:T(8)= 1.5 / 8 = 0.1875 s。
但是我们确实有开销!因此,我们为两种策略计算B:
换句话说:每个任务线程策略的开销是原来的两倍!
最后,让我们计算加速比S(n)。这是算法在n个 CPU上比顺序执行(S(1)= 1)运行得更快的次数:
应用于我们的两种策略,我们得到:
因此,每CPU线程策略的速度比每任务线程多60%。
去做
我们还应该测量和比较内存消耗。
注意:这仅适用于占用CPU的任务。如果相反,您的任务执行了许多与I / O相关的工作,则可能会受益于拥有比CPU多的线程,因为等待I / O会将线程置于空闲模式,因此CPU可以同时执行另一个线程。但是即使在这种情况下,也有一个合理的上限,通常在PC上远低于2000。