Java中元素迭代器的并行计算

Bri*_*ris 5 java parallel-processing iterator

我现在有过几次同样的需求,并想就构建解决方案的正确方法获得其他想法。需要的是对多个线程上的多个元素执行一些操作,而无需一次将所有元素都放在内存中,只需要计算中的元素。就像,Iterables.partition是不够的,因为它预先将所有元素都放入内存中。

用代码表达,我想写一个 BulkCalc2,它和 BulkCalc1 做同样的事情,只是并行。下面的示例代码说明了我的最佳尝试。我不满意,因为它又大又丑,但它似乎实现了我的目标,即在工作完成之前保持线程的高度利用,在计算过程中传播任何异常,并且一次在内存中不必有超过numThreads 个 BigThing实例.

我会接受以最简洁的方式满足既定目标的答案,无论是改进 BulkCalc2 的方法还是完全不同的解决方案。

interface BigThing {

    int getId();

    String getString();
}

class Calc {

    // somewhat expensive computation
    double calc(BigThing bigThing) {
        Random r = new Random(bigThing.getString().hashCode());
        double d = 0;
        for (int i = 0; i < 100000; i++) {
            d += r.nextDouble();
        }
        return d;
    }
}

class BulkCalc1 {

    final Calc calc;

    public BulkCalc1(Calc calc) {
        this.calc = calc;
    }

    public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
        TreeMap<Integer, Double> results = Maps.newTreeMap();
        while (in.hasNext()) {
            BigThing o = in.next();
            results.put(o.getId(), calc.calc(o));
        }
        return results;
    }
}

class SafeIterator<T> {

    final Iterator<T> in;

    SafeIterator(Iterator<T> in) {
        this.in = in;
    }

    synchronized T nextOrNull() {
        if (in.hasNext()) {
            return in.next();
        }
        return null;
    }
}

class BulkCalc2 {

    final Calc calc;
    final int numThreads;

    public BulkCalc2(Calc calc, int numThreads) {
        this.calc = calc;
        this.numThreads = numThreads;
    }

    public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
        ExecutorService e = Executors.newFixedThreadPool(numThreads);
        List<Future<?>> futures = Lists.newLinkedList();

        final Map<Integer, Double> results = new MapMaker().concurrencyLevel(numThreads).makeMap();
        final SafeIterator<BigThing> it = new SafeIterator<BigThing>(in);
        for (int i = 0; i < numThreads; i++) {
            futures.add(e.submit(new Runnable() {

                @Override
                public void run() {
                    while (true) {
                        BigThing o = it.nextOrNull();
                        if (o == null) {
                            return;
                        }
                        results.put(o.getId(), calc.calc(o));
                    }
                }
            }));
        }

        e.shutdown();

        for (Future<?> future : futures) {
            try {
                future.get();
            } catch (InterruptedException ex) {
                // swallowing is OK
            } catch (ExecutionException ex) {
                throw Throwables.propagate(ex.getCause());
            }
        }

        return new TreeMap<Integer, Double>(results);
    }
}
Run Code Online (Sandbox Code Playgroud)

Bob*_*Gee 1

编辑:修改,更快的版本

注意: 这实际上不太简洁,但运行速度应该要快得多。要在迭代器上运行,您可以调用静态方法BulkCalcRunner.runBulkCalc(Iterator,Calc) 或指定多个线程。干净、相当简洁,并且可能是您可以获得的最快的解决方案。

速度更快的原因:

  • 结果被收集在线程本地 HashMap 中——收集它们不需要同步。否则,需要同步来存储每个结果。这改进了每个线程的扩展,并提供了更好的引用局部性(您的 HashMap 可以完全存在于每个处理器的 L2 缓存中,无需通信)。
  • 使用 HashMap 代替效率较低的 Map 集合
  • 错误被捆绑到一个集合中以供以后处理。使用线程池,每个异常都需要一个线程死亡并重新创建

    接口 BigThing { int getId(); 字符串 getString(); }

    class Calc {
        // somewhat expensive computation
        double calc(BigThing bigThing) {
            Random r = new Random(bigThing.getString().hashCode());
            double d = 0;
            for (int i = 0; i < 100000; i++) {
                d += r.nextDouble();
            }
            return d;
        }
    }
    
    static class BulkCalcRunner implements Runnable {
        Calc calc;
        CountDownLatch latch;
        Iterator<BigThing> it;
        Collection<Throwable> errors;
        Map<Integer,Double> results;
    
        public BulkCalcRunner (Calc calc, Iterator<BigThing> it, CountDownLatch latch, Map<Integer,Double> results, Collection<Throwable> errors) {
            this.calc = calc;
            this.latch = latch;
            this.errors = errors;
            this.results = results;
        }
    
        public void run() {
            ArrayList<Throwable> errorLocal = new ArrayList<Throwable>();
            HashMap<Integer,Double> resultsLocal = new HashMap<Integer,Double>();
            while (true) {
                BigThing thing = null;
                try {
                    synchronized (it) {
                        if (it.hasNext()) {
                            thing = it.next();
                        }
                    }
                } catch (Exception e) { //prevents iterator errors from causing endless loop
                    thing = null;
                }
                //finished when first null BigThing encountered
                if (thing == null) {
                    synchronized (errors) {
                        errors.addAll(errorLocal);
                    }
                    synchronized(results) {
                        results.putAll(resultsLocal);
                    }
                    latch.countDown();
                    break;
                }
                try {
                    resultsLocal.put(thing.getId(), calc.calc(thing));
                } catch (Exception e) {
                    errorLocal.add(e);
                }
            }
        }
    
        public static Map<Integer,Double> runBulkCalc(Iterator<BigThing> iterator, Calc calculation, int numThreads) {
            final ConcurrentHashMap<Integer, Double> results = new ConcurrentHashMap<Integer, Double>();
            final ArrayList<Throwable> errors = new ArrayList<Throwable>();
            final CountDownLatch latch = new CountDownLatch(numThreads);
    
            //start up the worker threads
            for (int i = 0; i < numThreads; i++) {
                new Thread(new BulkCalcRunner(calculation,iterator,latch, results, errors)).start();
            }
    
            try {
                //Latch waits for all the worker threads to check in as "done"
                latch.await();
            } catch (InterruptedException ex) {
                // swallowing is better than spitting it out...
            }
    
            //finally, propagate errors!
            for (Throwable th : errors) {
                throw Throwables.propagate(th.getCause());
            }
            return results;
        }
    
        public static Map<Integer,Double> runBulkCalc(Iterator<BigThing> iterator, Calc calculation) {
            return runBulkCalc(iterator,calculation,Runtime.getRuntime().availableProcessors());
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)