ExecutorService令人惊讶的性能收支平衡点---经验法则?

Sha*_*baz 18 java performance executorservice

我正在试图弄清楚如何正确使用Java的Executors.我意识到将任务提交给ExecutorService有自己的开销.但是,我很惊讶它看到它的高度.

我的程序需要以尽可能低的延迟处理大量数据(股票市场数据).大多数计算都是相当简单的算术运算.

我试着测试一些非常简单的东西:" Math.random() * Math.random()"

最简单的测试在一个简单的循环中运行这个计算.第二个测试在匿名Runnable中进行相同的计算(这应该衡量创建新对象的成本).第三测试传递Runnable到一个ExecutorService(在此测定引入执行人的成本).

我在我的小型笔记本电脑上运行测试(2 cpus,1.5 gig ram):

(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422
Run Code Online (Sandbox Code Playgroud)

(大约四次运行中,前两个数字最终相等)

请注意,执行程序所花费的时间远远多于在单个线程上执行的时间.对于1到8之间的线程池大小,数字大致相同.

问题:我是否遗漏了一些明显的或者预期的结果?这些结果告诉我,我传递给执行程序的任何任务都必须进行一些非平凡的计算.如果我正在处理数百万条消息,并且我需要对每条消息执行非常简单(且便宜)的转换,我仍然可能无法使用执行程序...尝试在多个CPU之间传播计算可能最终会比仅仅更昂贵在一个线程中完成它们.设计决策变得比我原先想象的要复杂得多.有什么想法吗?


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

 private static int count = 100000;

 public static void main(String[] args) throws InterruptedException {

  //warmup
  simpleCompuation();
  computationWithObjCreation();
  computationWithObjCreationAndExecutors();

  long start = System.currentTimeMillis();
  simpleCompuation();
  long stop = System.currentTimeMillis();
  System.out.println("simpleCompuation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreation();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreationAndExecutors();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));


 }

 private static void computationWithObjCreation() {
  for(int i=0;i<count;i++){
   new Runnable(){

    @Override
    public void run() {
     double x = Math.random()*Math.random();
    }

   }.run();
  }

 }

 private static void simpleCompuation() {
  for(int i=0;i<count;i++){
   double x = Math.random()*Math.random();
  }

 }

 private static void computationWithObjCreationAndExecutors()
   throws InterruptedException {

  ExecutorService es = Executors.newFixedThreadPool(1);
  for(int i=0;i<count;i++){
   es.submit(new Runnable() {
    @Override
    public void run() {
     double x = Math.random()*Math.random();     
    }
   });
  }
  es.shutdown();
  es.awaitTermination(10, TimeUnit.SECONDS);
 }
}
Run Code Online (Sandbox Code Playgroud)

caf*_*abe 19

  1. 使用执行程序是关于利用CPU和/或CPU内核,因此如果创建一个充分利用CPU数量的线程池,则必须拥有与CPU /内核一样多的线程.
  2. 你是对的,创建新对象的成本太高了.因此,减少开支的一种方法是使用批次.如果您知道要执行的计算类型和数量,则可以创建批处理.因此,考虑在一个执行的任务中完成的千次计算.您为每个线程创建批次.计算完成后(java.util.concurrent.Future),您将创建下一批.甚至可以在parralel中创建新批次(4个CPU - > 3个线程用于计算,1个线程用于批量配置).最后,您可能会获得更高的吞吐量,但内存需求更高(批量,配置).

编辑:我改变了你的例子,我让它在我的小型双核x200笔记本电脑上运行.

provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9
Run Code Online (Sandbox Code Playgroud)

正如您在源代码中看到的那样,我也将批量配置和执行器生命周期从测量中取出.与其他两种方法相比,这更公平.

自己看结果......

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

    private static int count = 100000;

    public static void main( String[] args ) throws InterruptedException {

        final int cpus = Runtime.getRuntime().availableProcessors();

        final ExecutorService es = Executors.newFixedThreadPool( cpus );

        final Vector< Batch > batches = new Vector< Batch >( cpus );

        final int batchComputations = count / cpus;

        for ( int i = 0; i < cpus; i++ ) {
            batches.add( new Batch( batchComputations ) );
        }

        System.out.println( "provisioned " + cpus + " batches to be executed" );

        // warmup
        simpleCompuation();
        computationWithObjCreation();
        computationWithObjCreationAndExecutors( es, batches );

        long start = System.currentTimeMillis();
        simpleCompuation();
        long stop = System.currentTimeMillis();
        System.out.println( "simpleCompuation:" + ( stop - start ) );

        start = System.currentTimeMillis();
        computationWithObjCreation();
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreation:" + ( stop - start ) );

        // Executor

        start = System.currentTimeMillis();
        computationWithObjCreationAndExecutors( es, batches );    
        es.shutdown();
        es.awaitTermination( 10, TimeUnit.SECONDS );
        // Note: Executor#shutdown() and Executor#awaitTermination() requires
        // some extra time. But the result should still be clear.
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreationAndExecutors:"
                + ( stop - start ) );
    }

    private static void computationWithObjCreation() {

        for ( int i = 0; i < count; i++ ) {
            new Runnable() {

                @Override
                public void run() {

                    double x = Math.random() * Math.random();
                }

            }.run();
        }

    }

    private static void simpleCompuation() {

        for ( int i = 0; i < count; i++ ) {
            double x = Math.random() * Math.random();
        }

    }

    private static void computationWithObjCreationAndExecutors(
            ExecutorService es, List< Batch > batches )
            throws InterruptedException {

        for ( Batch batch : batches ) {
            es.submit( batch );
        }

    }

    private static class Batch implements Runnable {

        private final int computations;

        public Batch( final int computations ) {

            this.computations = computations;
        }

        @Override
        public void run() {

            int countdown = computations;
            while ( countdown-- > -1 ) {
                double x = Math.random() * Math.random();
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • Math.random需要进行同步,因此多线程测试对于同步执行的性能比较并不真正有效. (2认同)

ZZ *_*der 7

由于以下原因,这不是对线程池的公平测试,

  1. 您根本没有利用池,因为您只有1个线程.
  2. 这项工作太简单了,无法证明合并开销.使用FPP在CPU上进行乘法只需要几个周期.

考虑到以下额外步骤,线程池除了创建对象和运行作业之外还要做,

  1. 将作业放入队列中
  2. 从队列中删除作业
  3. 从池中获取线程并执行作业
  4. 将线程返回池中

当你有一个真正的工作和多个线程时,线程池的好处将是显而易见的.


adr*_*nos 5

您提到的“开销”与 ExecutorService 无关,它是由多个线程在 Math.random 上同步引起的,从而产生锁争用。

所以是的,你错过了一些东西(下面的“正确”答案实际上并不正确)。

下面是一些 Java 8 代码,用于演示 8 个线程运行一个没有锁争用的简单函数:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleFunction;

import com.google.common.base.Stopwatch;

public class ExecServicePerformance {

    private static final int repetitions = 120;
    private static int totalOperations = 250000;
    private static final int cpus = 8;
    private static final List<Batch> batches = batches(cpus);

    private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };

    public static void main( String[] args ) throws InterruptedException {

        printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
        printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
        printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
        printExecutionTime("Executor pool", ExecServicePerformance::executorPool);

    }

    private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
        long time = 0;
        for (int i = 0; i < repetitions; i++) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
            time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
        }
        System.out.println(msg + " exec time: " + time);
    }    

    private static void synchronous() {
        for ( int i = 0; i < totalOperations; i++ ) {
            performanceFunc.apply(i);
        }
    }

    private static void synchronousBatches() {      
        for ( Batch batch : batches) {
            batch.synchronously();
        }
    }

    private static void asynchronousBatches() {

        CountDownLatch cb = new CountDownLatch(cpus);

        for ( Batch batch : batches) {
            Runnable r = () ->  { batch.synchronously(); cb.countDown(); };
            Thread t = new Thread(r);
            t.start();
        }

        try {
            cb.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }        
    }

    private static void executorPool() {

        final ExecutorService es = Executors.newFixedThreadPool(cpus);

        for ( Batch batch : batches ) {
            Runnable r = () ->  { batch.synchronously(); };
            es.submit(r);
        }

        es.shutdown();

        try {
            es.awaitTermination( 10, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } 

    }

    private static List<Batch> batches(final int cpus) {
        List<Batch> list = new ArrayList<Batch>();
        for ( int i = 0; i < cpus; i++ ) {
            list.add( new Batch( totalOperations / cpus ) );
        }
        System.out.println("Batches: " + list.size());
        return list;
    }

    private static class Batch {

        private final int operationsInBatch;

        public Batch( final int ops ) {
            this.operationsInBatch = ops;
        }

        public void synchronously() {
            for ( int i = 0; i < operationsInBatch; i++ ) {
                performanceFunc.apply(i);
            }
        }
    }


}
Run Code Online (Sandbox Code Playgroud)

25k 操作(毫秒)的 120 次测试的结果计时:

  • 同步执行时间:9956
  • 同步批次执行时间:9900
  • 每批执行时间的线程数:2176
  • 执行器池执行时间:1922

获胜者:Executor Service。