Rob*_*obz 6 java parallel-processing multithreading quicksort
我正在尝试在Java中并行化算法.我从合并排序开始,并在这个问题上发布了我的尝试.我修改过的尝试是在下面的代码中,我现在尝试并行快速排序.
在我的多线程实现或解决此问题的方法中是否存在任何新手错误?如果不是,我不应期望在双核上的顺序算法和并行算法之间的速度增加超过32%(参见底部的时间)?
这是多线程算法:
    public class ThreadedQuick extends Thread
    {
        final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
        CountDownLatch doneSignal;
        static int num_threads = 1;
        int[] my_array;
        int start, end;
        public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) {
            this.my_array = array;
            this.start = start;
            this.end = end;
            this.doneSignal = doneSignal;
        }
        public static void reset() {
            num_threads = 1;
        }
        public void run() {
            quicksort(my_array, start, end);
            doneSignal.countDown();
            num_threads--;
        }
        public void quicksort(int[] array, int start, int end) {
            int len = end-start+1;
            if (len <= 1)
                return;
            int pivot_index = medianOfThree(array, start, end);
            int pivotValue = array[pivot_index];
            swap(array, pivot_index, end);
            int storeIndex = start;
            for (int i = start; i < end; i++) {
               if (array[i] <= pivotValue) {
                   swap(array, i, storeIndex);
                   storeIndex++;
               }
            }
            swap(array, storeIndex, end);
            if (num_threads < MAX_THREADS) {
                num_threads++;
                CountDownLatch completionSignal = new CountDownLatch(1);
                new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start();
                quicksort(array, storeIndex + 1, end);
                try {
                    completionSignal.await(1000, TimeUnit.SECONDS);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
            } else {
                quicksort(array, start, storeIndex - 1);
                quicksort(array, storeIndex + 1, end);
            }
        }
    }
以下是我的启动方式:
ThreadedQuick.reset();
CountDownLatch completionSignal = new CountDownLatch(1);
new ThreadedQuick(completionSignal, array, 0, array.length-1).start();
try {
    completionSignal.await(1000, TimeUnit.SECONDS);
} catch(Exception ex){
    ex.printStackTrace();
}
我针对Arrays.sort和类似的顺序快速排序算法对此进行了测试.以下是intel duel-core dell笔记本电脑的计时结果,只需几秒钟:
元素:500,000,顺序:0.068592,线程:0.046871,Arrays.sort:0.079677
元素:1,000,000,顺序:0.14416,线程:0.095492,Arrays.sort:0.167155
元素:2,000,000,顺序:0.301666,线程:0.205719,Arrays.sort:0.350982
元素:4,000,000,顺序:0.623291,线程:0.424119,Arrays.sort:0.712698
元素:8,000,000,顺序:1.279374,线程:0.859363,Arrays.sort:1.487671
上面的每个数字是100次测试的平均时间,抛出3个最低和3个最高的情况.我使用Random.nextInt(Integer.MAX_VALUE)为每个测试生成一个数组,每10次测试使用相同的种子初始化一次.每个测试包括使用System.nanoTime对给定算法进行计时.平均后我舍入到小数点后六位.显然,我确实检查了每种是否有效.
如您所见,在每组测试中,顺序和线程案例之间的速度提高了约32%.正如我上面提到的,我不应该期待更多吗?
Mic*_*ker 10
将numThreads设为静态可能会导致问题,很可能最终会在某些时候运行MAX_THREADS以上.
可能你没有在性能上完全翻倍的原因是你的快速排序无法完全并行化.请注意,对quicksort的第一次调用将在初始线程中开始真正并行运行之前传递整个数组.在耕种到单独的线程时,还存在以上下文切换和模式转换的形式并行化算法的开销.
看看Fork/Join框架,这个问题可能非常适合那里.
关于实施的几点意见.实现Runnable而不是扩展Thread.只有在创建一些新版本的Thread类时,才应使用扩展线程.当你只想做一些并行运行的工作时,你最好使用Runnable.在运行Runnable的同时,您还可以扩展另一个类,从而为OO设计提供更大的灵活性.使用仅限于系统中可用线程数的线程池.也不要使用numThreads来决定是否分叉新线程.你可以预先计算出来.使用最小分区大小,即总阵列的大小除以可用的处理器数.就像是:
public class ThreadedQuick implements Runnable {
    public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
    static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);
    final int[] my_array;
    final int start, end;
    private final int minParitionSize;
    public ThreadedQuick(int minParitionSize, int[] array, int start, int end) {
        this.minParitionSize = minParitionSize;
        this.my_array = array;
        this.start = start;
        this.end = end;
    }
    public void run() {
        quicksort(my_array, start, end);
    }
    public void quicksort(int[] array, int start, int end) {
        int len = end - start + 1;
        if (len <= 1)
            return;
        int pivot_index = medianOfThree(array, start, end);
        int pivotValue = array[pivot_index];
        swap(array, pivot_index, end);
        int storeIndex = start;
        for (int i = start; i < end; i++) {
            if (array[i] <= pivotValue) {
                swap(array, i, storeIndex);
                storeIndex++;
            }
        }
        swap(array, storeIndex, end);
        if (len > minParitionSize) {
            ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1);
            Future<?> future = executor.submit(quick);
            quicksort(array, storeIndex + 1, end);
            try {
                future.get(1000, TimeUnit.SECONDS);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        } else {
            quicksort(array, start, storeIndex - 1);
            quicksort(array, storeIndex + 1, end);
        }
    }    
}
你可以这样做:
ThreadedQuick quick = new ThreadedQuick(array / ThreadedQuick.MAX_THREADS, array, 0, array.length - 1);
quick.run();
这将在同一个线程中启动排序,这可以避免在启动时出现不必要的线程跳转.
警告:不确定上面的实现会更快,因为我没有对它进行基准测试.