针对兰德流的高性能缓冲

Bry*_*yce 7 java concurrency performance multithreading equation

我的代码消耗了大量(当前数百万,最终数十亿)相对较短(5-100个元素)的随机数阵列,并对它们进行了一些非常非常费力的数学运算.随机数是,随机的,理想情况下我想在多个核心上生成它们,因为随机数生成大约是我的运行时分析的50%.但是,我很难以一种不比单线程方法慢的方式分发大量小任务.

我的代码目前看起来像这样:

for(int i=0;i<1000000;i++){
    for(RealVector d:data){
        while(!converged){
            double[] shortVec = new double[5];
            for(int i=0;i<5;i++) shortVec[i]=rng.nextGaussian();
            double[] longerVec = new double[50];
            for(int i=0;i<50;i++) longerVec[i]=rng.nextGaussian();
            /*Do some relatively fast math*/
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我采取的方法没有奏效的是:

  • 1+个线程填充ArrayBlockingQueue,我的主循环消耗并填充数组(装箱/取消装箱在这里是杀手)
  • 在执行数学的非依赖部分时生成具有Callable(产生未来)的向量(看起来间接的开销超过了我获得的任何并行性增益)
  • 使用2个ArrayBlockingQueue,每个由一个线程填充,一个用于short,一个用于长数组(仍然大约是直接单线程情况的两倍).

我不是在寻找解决我特定问题的"解决方案",而是如何处理并行生成大型小型独立基元流并从单个线程中消耗它们的一般情况.

Gra*_*ray 5

您的性能问题似乎是单个作业太小,因此大部分时间花在执行同步和排队作业本身上.要考虑的一件事是不要生成大量的小作业流,而是要向每个工作线程提供一个中等大小的作业集合,它将用答案进行注释.

例如,不是在第一个线程进行迭代#0的情况下迭代你的循环,而是进行迭代#1的下一个线程,......我将让第一个线程进行迭代#0到#999或其他一些.他们应该独立工作,并Job用他们的计算答案注释一个类.然后在最后他们可以返回已完成作业的整个作业集合Future.

您的Job课程可能如下所示:

public class Job {
    Collection<RealVector> dataCollection;
    Collection<SomeAnswer> answerCollection = new ArrayList<SomeAnswer>();
    public void run() {
        for (RealVector d : dataCollection) {
           // do the magic work on the vector
           while(!converged){
              ...
           }
           // put the associated "answer" in another collection
           answerCollection.add(someAnswer);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


Pet*_*rey 4

这比使用队列更有效,因为;

  • 有效负载是一个数组,double[]意味着后台线程可以在必须传递数据之前生成更多数据。
  • 所有的对象都被回收。

public class RandomGenerator {
    private final ExecutorService generator = Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "generator");
            t.setDaemon(true);
            return t;
        }
    });
    private final Exchanger<double[][]> exchanger = new Exchanger<>();
    private double[][] buffer;
    private int nextRow = Integer.MAX_VALUE;

    public RandomGenerator(final int rows, final int columns) {
        buffer = new double[rows][columns];
        generator.submit(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                Random random = new Random();
                double[][] buffer2 = new double[rows][columns];
                while (!Thread.interrupted()) {
                    for (int r = 0; r < rows; r++)
                        for (int c = 0; c < columns; c++)
                            buffer2[r][c] = random.nextGaussian();
                    buffer2 = exchanger.exchange(buffer2);
                }
                return null;
            }
        });
    }

    public double[] nextArray() throws InterruptedException {
        if (nextRow >= buffer.length) {
            buffer = exchanger.exchange(buffer);
            nextRow = 0;
        }
        return buffer[nextRow++];
    }
}
Run Code Online (Sandbox Code Playgroud)

随机是线程安全且同步的。这意味着每个线程需要它自己的随机数才能同时执行。

如何处理并行生成大量小型独立基元流并从单个线程使用它们的一般情况。

我会使用 anExchanger<double[][]>在后台填充值,以便有效地传递它们(没有太多 GC 开销)