Mar*_*nik 14 java parallel-processing java-8
当流源是a时,我无法实现流处理的良好并行化Reader
.在四核CPU上运行下面的代码我首先观察到3个核心,然后突然下降到两个核心,然后是一个核心.整体CPU利用率徘徊在50%左右.
请注意示例的以下特征:
这意味着所有压力都在CPU上,I/O很小.这个例子是一个用于自动并行化的坐鸭.
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
... class imports elided ...
public class Main
{
static final AtomicLong totalTime = new AtomicLong();
public static void main(String[] args) throws IOException {
final long start = System.nanoTime();
final Path inputPath = createInput();
System.out.println("Start processing");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
Files.lines(inputPath).parallel().map(Main::processLine)
.forEach(w::println);
}
final double cpuTime = totalTime.get(),
realTime = System.nanoTime()-start;
final int cores = Runtime.getRuntime().availableProcessors();
System.out.println(" Cores: " + cores);
System.out.format(" CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
System.out.format(" Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
}
private static String processLine(String line) {
final long localStart = System.nanoTime();
double ret = 0;
for (int i = 0; i < line.length(); i++)
for (int j = 0; j < line.length(); j++)
ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
final long took = System.nanoTime()-localStart;
totalTime.getAndAdd(took);
return NANOSECONDS.toMillis(took) + " " + ret;
}
private static Path createInput() throws IOException {
final Path inputPath = Paths.get("input.txt");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
for (int i = 0; i < 6_000; i++) {
final String text = String.valueOf(System.nanoTime());
for (int j = 0; j < 25; j++) w.print(text);
w.println();
}
}
return inputPath;
}
}
Run Code Online (Sandbox Code Playgroud)
我的典型输出:
Cores: 4
CPU time: 110.23 s
Real time: 53.60 s
CPU utilization: 51.41%
Run Code Online (Sandbox Code Playgroud)
为了比较,如果我使用稍微修改的变体,我首先收集到列表然后处理:
Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
.forEach(w::println);
Run Code Online (Sandbox Code Playgroud)
我得到这个典型的输出:
Cores: 4
CPU time: 138.43 s
Real time: 35.00 s
CPU utilization: 98.87%
Run Code Online (Sandbox Code Playgroud)
什么可以解释这种影响,我如何解决它以获得充分利用?
请注意,我最初在servlet输入流的读者上观察到这一点,因此它不是特定于a FileReader
.
下面是答案,在源代码中阐明Spliterators.IteratorSpliterator
,通过所使用的一个BufferedReader#lines()
:
@Override
public Spliterator<T> trySplit() {
/*
* Split into arrays of arithmetically increasing batch
* sizes. This will only improve parallel performance if
* per-element Consumer actions are more costly than
* transferring them into an array. The use of an
* arithmetic progression in split sizes provides overhead
* vs parallelism bounds that do not particularly favor or
* penalize cases of lightweight vs heavyweight element
* operations, across combinations of #elements vs #cores,
* whether or not either are known. We generate
* O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
* potential speedup.
*/
Iterator<? extends T> i;
long s;
if ((i = it) == null) {
i = it = collection.iterator();
s = est = (long) collection.size();
}
else
s = est;
if (s > 1 && i.hasNext()) {
int n = batch + BATCH_UNIT;
if (n > s)
n = (int) s;
if (n > MAX_BATCH)
n = MAX_BATCH;
Object[] a = new Object[n];
int j = 0;
do { a[j] = i.next(); } while (++j < n && i.hasNext());
batch = j;
if (est != Long.MAX_VALUE)
est -= j;
return new ArraySpliterator<>(a, 0, j, characteristics);
}
return null;
}
Run Code Online (Sandbox Code Playgroud)
同样值得注意的是常数:
static final int BATCH_UNIT = 1 << 10; // batch array size increment
static final int MAX_BATCH = 1 << 25; // max batch array size;
Run Code Online (Sandbox Code Playgroud)
因此,在我的示例中,我使用了6,000个元素,因为批量大小步长为1024,我只得到三个批次.这正好解释了我的观察结果,最初使用了三个核心,当小批量完成时,使用的是两个核心,然后是一个核心.与此同时,我尝试了一个包含60,000个元素的修改示例,然后我获得了几乎100%的CPU利用率.
为了解决我的问题,我开发了下面的代码,它允许我将任何现有的流转换为Spliterator#trySplit
将其分成指定大小的批次的流.从我的问题中将它用于用例的最简单方法是这样的:
toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)
Run Code Online (Sandbox Code Playgroud)
在较低级别,下面的类是spliterator包装器,它更改包装的spliterator的trySplit
行为并保持其他方面不变.
import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
private final Spliterator<T> spliterator;
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
final int c = toWrap.characteristics();
this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
this.spliterator = toWrap;
this.est = est;
this.batchSize = batchSize;
}
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
this(toWrap, toWrap.estimateSize(), batchSize);
}
public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
}
@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!spliterator.tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}
Run Code Online (Sandbox Code Playgroud)
这个问题在一定程度上在 Java-9 早期访问版本中得到解决。将Files.lines
被改写,现在在拆分它实际上跳进内存映射文件的中间。这是我的机器上的结果(它有 4 个超线程内核 = 8 个硬件线程):
Java 8u60:
Start processing
Cores: 8
CPU time: 73,50 s
Real time: 36,54 s
CPU utilization: 25,15%
Run Code Online (Sandbox Code Playgroud)
Java 9b82:
Start processing
Cores: 8
CPU time: 79,64 s
Real time: 10,48 s
CPU utilization: 94,95%
Run Code Online (Sandbox Code Playgroud)
如您所见,实时性和 CPU 利用率都得到了极大的提高。
不过,这种优化有一些限制。目前它仅适用于几种编码(即UTF-8、ISO_8859_1 和 US_ASCII),因为对于任意编码,您并不确切知道换行符是如何编码的。它仅限于不超过 2Gb 大小的文件(由于MappedByteBuffer
Java 中的限制),当然不适用于某些非常规文件(如字符设备、无法进行内存映射的命名管道)。在这种情况下,旧的实现被用作后备。
归档时间: |
|
查看次数: |
1183 次 |
最近记录: |