Bri*_*ris 5 java parallel-processing iterator
我现在有过几次同样的需求,并想就构建解决方案的正确方法获得其他想法。需要的是对多个线程上的多个元素执行一些操作,而无需一次将所有元素都放在内存中,只需要计算中的元素。就像,Iterables.partition是不够的,因为它预先将所有元素都放入内存中。
用代码表达,我想写一个 BulkCalc2,它和 BulkCalc1 做同样的事情,只是并行。下面的示例代码说明了我的最佳尝试。我不满意,因为它又大又丑,但它似乎实现了我的目标,即在工作完成之前保持线程的高度利用,在计算过程中传播任何异常,并且一次在内存中不必有超过numThreads 个 BigThing实例.
我会接受以最简洁的方式满足既定目标的答案,无论是改进 BulkCalc2 的方法还是完全不同的解决方案。
interface BigThing {
int getId();
String getString();
}
class Calc {
// somewhat expensive computation
double calc(BigThing bigThing) {
Random r = new Random(bigThing.getString().hashCode());
double d = 0;
for (int i = 0; i < 100000; i++) {
d += r.nextDouble();
}
return d;
}
}
class BulkCalc1 {
final Calc calc;
public BulkCalc1(Calc calc) {
this.calc = calc;
}
public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
TreeMap<Integer, Double> results = Maps.newTreeMap();
while (in.hasNext()) {
BigThing o = in.next();
results.put(o.getId(), calc.calc(o));
}
return results;
}
}
class SafeIterator<T> {
final Iterator<T> in;
SafeIterator(Iterator<T> in) {
this.in = in;
}
synchronized T nextOrNull() {
if (in.hasNext()) {
return in.next();
}
return null;
}
}
class BulkCalc2 {
final Calc calc;
final int numThreads;
public BulkCalc2(Calc calc, int numThreads) {
this.calc = calc;
this.numThreads = numThreads;
}
public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
ExecutorService e = Executors.newFixedThreadPool(numThreads);
List<Future<?>> futures = Lists.newLinkedList();
final Map<Integer, Double> results = new MapMaker().concurrencyLevel(numThreads).makeMap();
final SafeIterator<BigThing> it = new SafeIterator<BigThing>(in);
for (int i = 0; i < numThreads; i++) {
futures.add(e.submit(new Runnable() {
@Override
public void run() {
while (true) {
BigThing o = it.nextOrNull();
if (o == null) {
return;
}
results.put(o.getId(), calc.calc(o));
}
}
}));
}
e.shutdown();
for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException ex) {
// swallowing is OK
} catch (ExecutionException ex) {
throw Throwables.propagate(ex.getCause());
}
}
return new TreeMap<Integer, Double>(results);
}
}
Run Code Online (Sandbox Code Playgroud)
编辑:修改,更快的版本
注意: 这实际上不太简洁,但运行速度应该要快得多。要在迭代器上运行,您可以调用静态方法BulkCalcRunner.runBulkCalc(Iterator,Calc) 或指定多个线程。干净、相当简洁,并且可能是您可以获得的最快的解决方案。
错误被捆绑到一个集合中以供以后处理。使用线程池,每个异常都需要一个线程死亡并重新创建
接口 BigThing { int getId(); 字符串 getString(); }
class Calc {
// somewhat expensive computation
double calc(BigThing bigThing) {
Random r = new Random(bigThing.getString().hashCode());
double d = 0;
for (int i = 0; i < 100000; i++) {
d += r.nextDouble();
}
return d;
}
}
static class BulkCalcRunner implements Runnable {
Calc calc;
CountDownLatch latch;
Iterator<BigThing> it;
Collection<Throwable> errors;
Map<Integer,Double> results;
public BulkCalcRunner (Calc calc, Iterator<BigThing> it, CountDownLatch latch, Map<Integer,Double> results, Collection<Throwable> errors) {
this.calc = calc;
this.latch = latch;
this.errors = errors;
this.results = results;
}
public void run() {
ArrayList<Throwable> errorLocal = new ArrayList<Throwable>();
HashMap<Integer,Double> resultsLocal = new HashMap<Integer,Double>();
while (true) {
BigThing thing = null;
try {
synchronized (it) {
if (it.hasNext()) {
thing = it.next();
}
}
} catch (Exception e) { //prevents iterator errors from causing endless loop
thing = null;
}
//finished when first null BigThing encountered
if (thing == null) {
synchronized (errors) {
errors.addAll(errorLocal);
}
synchronized(results) {
results.putAll(resultsLocal);
}
latch.countDown();
break;
}
try {
resultsLocal.put(thing.getId(), calc.calc(thing));
} catch (Exception e) {
errorLocal.add(e);
}
}
}
public static Map<Integer,Double> runBulkCalc(Iterator<BigThing> iterator, Calc calculation, int numThreads) {
final ConcurrentHashMap<Integer, Double> results = new ConcurrentHashMap<Integer, Double>();
final ArrayList<Throwable> errors = new ArrayList<Throwable>();
final CountDownLatch latch = new CountDownLatch(numThreads);
//start up the worker threads
for (int i = 0; i < numThreads; i++) {
new Thread(new BulkCalcRunner(calculation,iterator,latch, results, errors)).start();
}
try {
//Latch waits for all the worker threads to check in as "done"
latch.await();
} catch (InterruptedException ex) {
// swallowing is better than spitting it out...
}
//finally, propagate errors!
for (Throwable th : errors) {
throw Throwables.propagate(th.getCause());
}
return results;
}
public static Map<Integer,Double> runBulkCalc(Iterator<BigThing> iterator, Calc calculation) {
return runBulkCalc(iterator,calculation,Runtime.getRuntime().availableProcessors());
}
}
Run Code Online (Sandbox Code Playgroud)| 归档时间: |
|
| 查看次数: |
2772 次 |
| 最近记录: |