Sha*_*baz 18 java performance executorservice
我正在试图弄清楚如何正确使用Java的Executors.我意识到将任务提交给ExecutorService有自己的开销.但是,我很惊讶它看到它的高度.
我的程序需要以尽可能低的延迟处理大量数据(股票市场数据).大多数计算都是相当简单的算术运算.
我试着测试一些非常简单的东西:" Math.random() * Math.random()"
最简单的测试在一个简单的循环中运行这个计算.第二个测试在匿名Runnable中进行相同的计算(这应该衡量创建新对象的成本).第三测试传递Runnable到一个ExecutorService(在此测定引入执行人的成本).
我在我的小型笔记本电脑上运行测试(2 cpus,1.5 gig ram):
(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422
Run Code Online (Sandbox Code Playgroud)
(大约四次运行中,前两个数字最终相等)
请注意,执行程序所花费的时间远远多于在单个线程上执行的时间.对于1到8之间的线程池大小,数字大致相同.
问题:我是否遗漏了一些明显的或者预期的结果?这些结果告诉我,我传递给执行程序的任何任务都必须进行一些非平凡的计算.如果我正在处理数百万条消息,并且我需要对每条消息执行非常简单(且便宜)的转换,我仍然可能无法使用执行程序...尝试在多个CPU之间传播计算可能最终会比仅仅更昂贵在一个线程中完成它们.设计决策变得比我原先想象的要复杂得多.有什么想法吗?
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecServicePerformance {
private static int count = 100000;
public static void main(String[] args) throws InterruptedException {
//warmup
simpleCompuation();
computationWithObjCreation();
computationWithObjCreationAndExecutors();
long start = System.currentTimeMillis();
simpleCompuation();
long stop = System.currentTimeMillis();
System.out.println("simpleCompuation:"+(stop-start));
start = System.currentTimeMillis();
computationWithObjCreation();
stop = System.currentTimeMillis();
System.out.println("computationWithObjCreation:"+(stop-start));
start = System.currentTimeMillis();
computationWithObjCreationAndExecutors();
stop = System.currentTimeMillis();
System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));
}
private static void computationWithObjCreation() {
for(int i=0;i<count;i++){
new Runnable(){
@Override
public void run() {
double x = Math.random()*Math.random();
}
}.run();
}
}
private static void simpleCompuation() {
for(int i=0;i<count;i++){
double x = Math.random()*Math.random();
}
}
private static void computationWithObjCreationAndExecutors()
throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
for(int i=0;i<count;i++){
es.submit(new Runnable() {
@Override
public void run() {
double x = Math.random()*Math.random();
}
});
}
es.shutdown();
es.awaitTermination(10, TimeUnit.SECONDS);
}
}
Run Code Online (Sandbox Code Playgroud)
caf*_*abe 19
编辑:我改变了你的例子,我让它在我的小型双核x200笔记本电脑上运行.
provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9
Run Code Online (Sandbox Code Playgroud)
正如您在源代码中看到的那样,我也将批量配置和执行器生命周期从测量中取出.与其他两种方法相比,这更公平.
自己看结果......
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecServicePerformance {
private static int count = 100000;
public static void main( String[] args ) throws InterruptedException {
final int cpus = Runtime.getRuntime().availableProcessors();
final ExecutorService es = Executors.newFixedThreadPool( cpus );
final Vector< Batch > batches = new Vector< Batch >( cpus );
final int batchComputations = count / cpus;
for ( int i = 0; i < cpus; i++ ) {
batches.add( new Batch( batchComputations ) );
}
System.out.println( "provisioned " + cpus + " batches to be executed" );
// warmup
simpleCompuation();
computationWithObjCreation();
computationWithObjCreationAndExecutors( es, batches );
long start = System.currentTimeMillis();
simpleCompuation();
long stop = System.currentTimeMillis();
System.out.println( "simpleCompuation:" + ( stop - start ) );
start = System.currentTimeMillis();
computationWithObjCreation();
stop = System.currentTimeMillis();
System.out.println( "computationWithObjCreation:" + ( stop - start ) );
// Executor
start = System.currentTimeMillis();
computationWithObjCreationAndExecutors( es, batches );
es.shutdown();
es.awaitTermination( 10, TimeUnit.SECONDS );
// Note: Executor#shutdown() and Executor#awaitTermination() requires
// some extra time. But the result should still be clear.
stop = System.currentTimeMillis();
System.out.println( "computationWithObjCreationAndExecutors:"
+ ( stop - start ) );
}
private static void computationWithObjCreation() {
for ( int i = 0; i < count; i++ ) {
new Runnable() {
@Override
public void run() {
double x = Math.random() * Math.random();
}
}.run();
}
}
private static void simpleCompuation() {
for ( int i = 0; i < count; i++ ) {
double x = Math.random() * Math.random();
}
}
private static void computationWithObjCreationAndExecutors(
ExecutorService es, List< Batch > batches )
throws InterruptedException {
for ( Batch batch : batches ) {
es.submit( batch );
}
}
private static class Batch implements Runnable {
private final int computations;
public Batch( final int computations ) {
this.computations = computations;
}
@Override
public void run() {
int countdown = computations;
while ( countdown-- > -1 ) {
double x = Math.random() * Math.random();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
由于以下原因,这不是对线程池的公平测试,
考虑到以下额外步骤,线程池除了创建对象和运行作业之外还要做,
当你有一个真正的工作和多个线程时,线程池的好处将是显而易见的.
您提到的“开销”与 ExecutorService 无关,它是由多个线程在 Math.random 上同步引起的,从而产生锁争用。
所以是的,你错过了一些东西(下面的“正确”答案实际上并不正确)。
下面是一些 Java 8 代码,用于演示 8 个线程运行一个没有锁争用的简单函数:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleFunction;
import com.google.common.base.Stopwatch;
public class ExecServicePerformance {
private static final int repetitions = 120;
private static int totalOperations = 250000;
private static final int cpus = 8;
private static final List<Batch> batches = batches(cpus);
private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };
public static void main( String[] args ) throws InterruptedException {
printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
printExecutionTime("Executor pool", ExecServicePerformance::executorPool);
}
private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
long time = 0;
for (int i = 0; i < repetitions; i++) {
Stopwatch stopwatch = Stopwatch.createStarted();
f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
}
System.out.println(msg + " exec time: " + time);
}
private static void synchronous() {
for ( int i = 0; i < totalOperations; i++ ) {
performanceFunc.apply(i);
}
}
private static void synchronousBatches() {
for ( Batch batch : batches) {
batch.synchronously();
}
}
private static void asynchronousBatches() {
CountDownLatch cb = new CountDownLatch(cpus);
for ( Batch batch : batches) {
Runnable r = () -> { batch.synchronously(); cb.countDown(); };
Thread t = new Thread(r);
t.start();
}
try {
cb.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void executorPool() {
final ExecutorService es = Executors.newFixedThreadPool(cpus);
for ( Batch batch : batches ) {
Runnable r = () -> { batch.synchronously(); };
es.submit(r);
}
es.shutdown();
try {
es.awaitTermination( 10, TimeUnit.SECONDS );
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static List<Batch> batches(final int cpus) {
List<Batch> list = new ArrayList<Batch>();
for ( int i = 0; i < cpus; i++ ) {
list.add( new Batch( totalOperations / cpus ) );
}
System.out.println("Batches: " + list.size());
return list;
}
private static class Batch {
private final int operationsInBatch;
public Batch( final int ops ) {
this.operationsInBatch = ops;
}
public void synchronously() {
for ( int i = 0; i < operationsInBatch; i++ ) {
performanceFunc.apply(i);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
25k 操作(毫秒)的 120 次测试的结果计时:
获胜者:Executor Service。
| 归档时间: |
|
| 查看次数: |
18339 次 |
| 最近记录: |