并行化快速排序使其变慢

And*_*man 5 java sorting parallel-processing multithreading quicksort

我正在快速搜索大量数据,为了好玩,我试图将其并行化以加快排序.但是,在它的当前形式中,由于同步阻塞点,多线程版本比单线程版本慢.
每次我产生一个线程,我得到一个锁在一个int并加一,而每次线程完成我再次得到锁和减量,除了检查是否有仍然运行(INT> 0)的线程.如果没有,我唤醒我的主线程并使用已排序的数据.

我相信有更好的方法可以做到这一点.不知道它是什么.非常感谢帮助.

编辑:我想我没有提供足够的信息.
这是octo-core Opteron上的Java代码.我无法切换语言.
我正在排序的数量适合内存,并且在调用quicksort时它已经存在于内存中,因此没有理由将其写入磁盘只是将其读回内存.
通过"获取锁定"我的意思是在整数上有一个同步块.

小智 8

在不了解更多有关实施的地方,我的建议和/或意见如下:

  1. 限制在任何给定时间可以运行的线程数.Pergaps 8或10(可能为调度程序提供更多余地,尽管最好每个核心/ hw线程放一个).如果亲和力不支持,则在CPU绑定问题上为"吞吐量"运行更多线程并不是真的有意义.

  2. 不要靠近树叶!只有更大的分支上的线程.没有必要产生一个线程来排序相对较少数量的项目,在这个级别上有许多小分支!线程会在这里增加更多的相对开销.(这类似于切换到叶子的"简单排序").

  3. 确保每个线程可以独立工作 - 不应该在工作期间踩到另一个线程 - > 没有锁,只是等待连接.分而治之.

  4. 可能会考虑执行"广度优先"方法来生成线程.

  5. 考虑一个快速排序的合并(我偏向于mergesort :-)请记住,有许多不同类型的合并,包括自下而上.

编辑

  1. 确保它确实有效.请记住正确利用线程之间的内存障碍 - 即使没有两个线程同时修改相同的数据以确保正确的可见性也需要.

编辑(概念验证):

我把这个简单的演示融合在一起.在我的Intel Core2 Duo @ 2Ghz上,我可以在大约2/3到3/4的时间运行,这肯定是一些改进:)(设置:DATA_SIZE = 3000000,MAX_THREADS = 4,MIN_PARALLEL = 1000).这是从维基百科中删除的基本就地快速排序代码,它没有利用任何其他基本优化.

它确定一个新线程是否可以/应该启动的方法也是非常原始的 - 如果没有新线程可用,它只是一直突然(因为,你知道,为什么要等待?)

这段代码也应该(希望)与线程一起广泛地扇出.这可能对数据局部性的效率低于保持深度的效率,但如果我的头脑,模型似乎很简单.

执行程序服务还用于简化设计并能够重用相同的线程(与生成新线程相比).在执行程序开销显示之前,MIN_PARALLEL可能变得非常小(例如,大约20) - 最大线程数和仅使用新线程 - 如果可能的话也可以保持这一点.

qsort average seconds: 0.6290541056
pqsort average seconds: 0.4513915392

我绝对不保证这段代码的实用性或正确性,但它"似乎在这里工作".请注意ThreadPoolExecutor旁边的警告,因为它清楚地表明我不完全确定发生了什么:-) 我相当肯定设计在利用线程方面有些缺陷.

package psq;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

public class Main {

    int[] genData (int len) {
        Random r = new Random();
        int[] newData = new int[len];
        for (int i = 0; i < newData.length; i++) {
            newData[i] = r.nextInt();
        }
        return newData;
    }      

    boolean check (int[] arr) {
        if (arr.length == 0) {
            return true;
        }
        int lastValue = arr[0];
        for (int i = 1; i < arr.length; i++) {
            //System.out.println(arr[i]);
            if (arr[i] < lastValue) {
                return false;
            }
            lastValue = arr[i];
        }
        return true;
    }

    int partition (int[] arr, int left, int right, int pivotIndex) {
        // pivotValue := array[pivotIndex]
        int pivotValue = arr[pivotIndex];
        {
            // swap array[pivotIndex] and array[right] // Move pivot to end
            int t = arr[pivotIndex];
            arr[pivotIndex] = arr[right];
            arr[right] = t;
        }
        // storeIndex := left
        int storeIndex = left;
        // for i  from  left to right - 1 // left ? i < right
        for (int i = left; i < right; i++) {
            //if array[i] ? pivotValue
            if (arr[i] <= pivotValue) {
                //swap array[i] and array[storeIndex]
                //storeIndex := storeIndex + 1            
                int t = arr[i];
                arr[i] = arr[storeIndex];
                arr[storeIndex] = t;
                storeIndex++;                   
            }
        }
        {
            // swap array[storeIndex] and array[right] // Move pivot to its final place
            int t = arr[storeIndex];
            arr[storeIndex] = arr[right];
            arr[right] = t;
        }
        // return storeIndex
        return storeIndex;
    }

    void quicksort (int[] arr, int left, int right) {
        // if right > left
        if (right > left) {            
            // select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
            int pivotIndex = left + (right - left) / 2;
            // pivotNewIndex := partition(array, left, right, pivotIndex)
            int pivotNewIndex = partition(arr, left, right, pivotIndex);
            // quicksort(array, left, pivotNewIndex - 1)
            // quicksort(array, pivotNewIndex + 1, right)
            quicksort(arr, left, pivotNewIndex - 1);
            quicksort(arr, pivotNewIndex + 1, right);
        }
    }

    static int DATA_SIZE = 3000000;
    static int MAX_THREADS = 4;
    static int MIN_PARALLEL = 1000;

    // NOTE THAT THE THREAD POOL EXECUTER USES A LINKEDBLOCKINGQUEUE
    // That is, because it's possible to OVER SUBMIT with this code,
    // even with the semaphores!
    ThreadPoolExecutor tp = new ThreadPoolExecutor(
            MAX_THREADS,
            MAX_THREADS,
            Long.MAX_VALUE,
            TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());
    // if there are no semaphore available then then we just continue
    // processing from the same thread and "deal with it"
    Semaphore sem = new Semaphore(MAX_THREADS, false); 

    class QuickSortAction implements Runnable {
        int[] arr;
        int left;
        int right;

        public QuickSortAction (int[] arr, int left, int right) {
            this.arr = arr;
            this.left = left;
            this.right = right;
        }

        public void run () {
            try {
                //System.out.println(">>[" + left + "|" + right + "]");
                pquicksort(arr, left, right);
                //System.out.println("<<[" + left + "|" + right + "]");
            } catch (Exception ex) {
                // I got nothing for this
                throw new RuntimeException(ex); 
            }
        }

    }

    // pquicksort
    // threads will [hopefully] fan-out "breadth-wise"
    // this is because it's likely that the 2nd executer (if needed)
    // will be submitted prior to the 1st running and starting its own executors
    // of course this behavior is not terribly well-define
    void pquicksort (int[] arr, int left, int right) throws ExecutionException, InterruptedException {
        if (right > left) {
            // memory barrier -- pquicksort is called from different threads
            synchronized (arr) {}

            int pivotIndex = left + (right - left) / 2;
            int pivotNewIndex = partition(arr, left, right, pivotIndex);

            Future<?> f1 = null;
            Future<?> f2 = null;

            if ((pivotNewIndex - 1) - left > MIN_PARALLEL) {
                if (sem.tryAcquire()) {
                    f1 = tp.submit(new QuickSortAction(arr, left, pivotNewIndex - 1));
                } else {
                    pquicksort(arr, left, pivotNewIndex - 1);
                }
            } else {
                quicksort(arr, left, pivotNewIndex - 1);
            }
            if (right - (pivotNewIndex + 1) > MIN_PARALLEL) {
                if (sem.tryAcquire()) {
                    f2 = tp.submit(new QuickSortAction(arr, pivotNewIndex + 1, right));
                } else {
                    pquicksort(arr, pivotNewIndex + 1, right);
                }
            } else {
                quicksort(arr, pivotNewIndex + 1, right);
            }

            // join back up
            if (f1 != null) {
                f1.get();
                sem.release();
            }
            if (f2 != null) {
                f2.get();
                sem.release();
            }
        }        
    }

    long qsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        quicksort(data, 0, data.length - 1);
        long duration = System.nanoTime() - start;
        if (!check(data)) {
            throw new Exception("qsort not sorted!");
        }
        return duration;
    }

    long pqsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        pquicksort(data, 0, data.length - 1);
        long duration = System.nanoTime() - start;
        if (!check(data)) {
            throw new Exception("pqsort not sorted!");
        }
        return duration;
    }

    public Main () throws Exception {
        long qsort_duration = 0;
        long pqsort_duration = 0;
        int ITERATIONS = 10;
        for (int i = 0; i < ITERATIONS; i++) {
            System.out.println("Iteration# " + i);
            int[] data = genData(DATA_SIZE);
            if ((i & 1) == 0) {
                qsort_duration += qsort_call(data);
                pqsort_duration += pqsort_call(data);
            } else {
                pqsort_duration += pqsort_call(data);
                qsort_duration += qsort_call(data);
            }
        }
        System.out.println("====");
        System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
        System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
    }

    public static void main(String[] args) throws Exception {
        new Main();
    }

}
Run Code Online (Sandbox Code Playgroud)

因人而异.快乐的编码.

(另外,我想知道你的8核盒子上的这个 - 或类似的 - 代码展览会.维基百科声称可以通过cpus数量进行线性加速:)

编辑(更好的数字)

删除使用期货导致轻微的"堵塞"并转换为单一的最终等待信号量:减少无用的等待.现在只运行55%的非线程时间:-)

qsort average seconds: 0.5999702528
pqsort average seconds: 0.3346969088

(

package psq;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

public class Main {

    int[] genData (int len) {
        Random r = new Random();
        int[] newData = new int[len];
        for (int i = 0; i < newData.length; i++) {
            newData[i] = r.nextInt();
        }
        return newData;
    }      

    boolean check (int[] arr) {
        if (arr.length == 0) {
            return true;
        }
        int lastValue = arr[0];
        for (int i = 1; i < arr.length; i++) {
            //System.out.println(arr[i]);
            if (arr[i] < lastValue) {
                return false;
            }
            lastValue = arr[i];
        }
        return true;
    }

    int partition (int[] arr, int left, int right, int pivotIndex) {
        // pivotValue := array[pivotIndex]
        int pivotValue = arr[pivotIndex];
        {
            // swap array[pivotIndex] and array[right] // Move pivot to end
            int t = arr[pivotIndex];
            arr[pivotIndex] = arr[right];
            arr[right] = t;
        }
        // storeIndex := left
        int storeIndex = left;
        // for i  from  left to right - 1 // left ? i < right
        for (int i = left; i < right; i++) {
            //if array[i] ? pivotValue
            if (arr[i] <= pivotValue) {
                //swap array[i] and array[storeIndex]
                //storeIndex := storeIndex + 1            
                int t = arr[i];
                arr[i] = arr[storeIndex];
                arr[storeIndex] = t;
                storeIndex++;                   
            }
        }
        {
            // swap array[storeIndex] and array[right] // Move pivot to its final place
            int t = arr[storeIndex];
            arr[storeIndex] = arr[right];
            arr[right] = t;
        }
        // return storeIndex
        return storeIndex;
    }

    void quicksort (int[] arr, int left, int right) {
        // if right > left
        if (right > left) {            
            // select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
            int pivotIndex = left + (right - left) / 2;
            // pivotNewIndex := partition(array, left, right, pivotIndex)
            int pivotNewIndex = partition(arr, left, right, pivotIndex);
            // quicksort(array, left, pivotNewIndex - 1)
            // quicksort(array, pivotNewIndex + 1, right)
            quicksort(arr, left, pivotNewIndex - 1);
            quicksort(arr, pivotNewIndex + 1, right);
        }
    }

    static int DATA_SIZE = 3000000;
    static int MAX_EXTRA_THREADS = 7;
    static int MIN_PARALLEL = 500;

    // To get to reducePermits
    @SuppressWarnings("serial")
    class Semaphore2 extends Semaphore {
        public Semaphore2(int permits, boolean fair) {
            super(permits, fair);
        }
        public void removePermit() {
            super.reducePermits(1);
        }
    }

    class QuickSortAction implements Runnable {
        final int[] arr;
        final int left;
        final int right;
        final SortState ss;

        public QuickSortAction (int[] arr, int left, int right, SortState ss) {
            this.arr = arr;
            this.left = left;
            this.right = right;
            this.ss = ss;
        }

        public void run () {
            try {
                //System.out.println(">>[" + left + "|" + right + "]");
                pquicksort(arr, left, right, ss);
                //System.out.println("<<[" + left + "|" + right + "]");
                ss.limit.release();
                ss.countdown.release();
            } catch (Exception ex) {
                // I got nothing for this
                throw new RuntimeException(ex); 
            }
        }

    }

    class SortState {
        final public ThreadPoolExecutor pool = new ThreadPoolExecutor(
            MAX_EXTRA_THREADS,
            MAX_EXTRA_THREADS,
            Long.MAX_VALUE,
            TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());
        // actual limit: executor may actually still have "active" things to process
        final public Semaphore limit = new Semaphore(MAX_EXTRA_THREADS, false); 
        final public Semaphore2 countdown = new Semaphore2(1, false); 
    }

    void pquicksort (int[] arr) throws Exception {
        SortState ss = new SortState();
        pquicksort(arr, 0, arr.length - 1, ss);
        ss.countdown.acquire();
    }

    // pquicksort
    // threads "fork" if available.
    void pquicksort (int[] arr, int left, int right, SortState ss) throws ExecutionException, InterruptedException {
        if (right > left) {
            // memory barrier -- pquicksort is called from different threads
            // and those threads may be created because they are in an executor
            synchronized (arr) {}

            int pivotIndex = left + (right - left) / 2;
            int pivotNewIndex = partition(arr, left, right, pivotIndex);

            {
                int newRight = pivotNewIndex - 1;
                if (newRight - left > MIN_PARALLEL) {
                    if (ss.limit.tryAcquire()) {
                        ss.countdown.removePermit();
                        ss.pool.submit(new QuickSortAction(arr, left, newRight, ss));
                    } else {
                        pquicksort(arr, left, newRight, ss);
                    }
                } else {
                    quicksort(arr, left, newRight);
                }
            }

            {
                int newLeft = pivotNewIndex + 1;
                if (right - newLeft > MIN_PARALLEL) {
                    if (ss.limit.tryAcquire()) {
                        ss.countdown.removePermit();
                        ss.pool.submit(new QuickSortAction(arr, newLeft, right, ss));
                    } else {
                        pquicksort(arr, newLeft, right, ss);
                    }
                } else {
                    quicksort(arr, newLeft, right);
                }
            }

        }        
    }

    long qsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        quicksort(data, 0, data.length - 1);
        long duration = System.nanoTime() - start;
        if (!check(data)) {
            throw new Exception("qsort not sorted!");
        }
        return duration;
    }

    long pqsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        pquicksort(data);
        long duration = System.nanoTime() - start;
        if (!check(data)) {            
            throw new Exception("pqsort not sorted!");
        }
        return duration;
    }

    public Main () throws Exception {
        long qsort_duration = 0;
        long pqsort_duration = 0;
        int ITERATIONS = 10;
        for (int i = 0; i < ITERATIONS; i++) {
            System.out.println("Iteration# " + i);
            int[] data = genData(DATA_SIZE);
            if ((i & 1) == 0) {
                qsort_duration += qsort_call(data);
                pqsort_duration += pqsort_call(data);
            } else {
                pqsort_duration += pqsort_call(data);
                qsort_duration += qsort_call(data);
            }
        }
        System.out.println("====");
        System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
        System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
    }

    public static void main(String[] args) throws Exception {
        new Main();
    }

}
Run Code Online (Sandbox Code Playgroud)