alt*_*ife 3 java csv parallel-processing nio stream
我正在做测试,以找出读取和处理csv文件的最佳方法。因此,我需要阅读csv文件的每一行并进行分析。因此,对于包含数千行的文件,基本上所有方法都可以正常工作。但是,当尝试使用包含超过一百万行的CSV文件时,出现内存不足异常。我认为Stream Parallel的执行速度会更快。所以我有些困惑,为什么会出现内存不足错误。Java如何处理并行读取?
下面是顺序并并行读取文件的测试代码。
String filename = "c:\\devs\\files\\datas.csv"; // 193MB
Path path = Paths.get(filename);
@Test
public void testFileExist() {
assertTrue(Files.exists(path));
}
@Test
public void testSingleThreadRead() {
Function<Path, String> processfile = (Path p) -> {
String result = "";
try {
result = Files.lines(p).collect(Collectors.joining(" ,"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
};
long start = System.currentTimeMillis();
String result = processfile.apply(path);
long end = System.currentTimeMillis();
assertFalse(result.isEmpty());
System.out.println(end -start + "ms");
}
@Test
public void testSingleThreadReadParallel() {
Function<Path, String> processfile = (Path p) -> {
String result = "";
try {
result = Files.lines(p).parallel().collect(Collectors.joining(" ,"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
};
long start = System.currentTimeMillis();
String result = processfile.apply(path);
long end = System.currentTimeMillis();
assertFalse(result.isEmpty());
System.out.println(end -start + "ms");
}
Run Code Online (Sandbox Code Playgroud)
例外
java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at test.TestProcessFile.lambda$1(TestProcessFile.java:48)
at test.TestProcessFile.testSingleThreadReadParallel(TestProcessFile.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at
Run Code Online (Sandbox Code Playgroud)
更新资料
在separe类中运行并行处理,仍然出现此异常
Exception in thread "main" java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at ProcessFileParallel.lambda$0(ProcessFileParallel.java:19)
at ProcessFileParallel.main(ProcessFileParallel.java:29)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at java.util.StringJoiner.merge(Unknown Source)
at java.util.stream.Collectors$$Lambda$5/990368553.apply(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$ReduceTask.onCompletion(Unknown Source)
at java.util.concurrent.CountedCompleter.tryComplete(Unknown Source)
at java.util.stream.AbstractTask.compute(Unknown Source)
at java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Run Code Online (Sandbox Code Playgroud)
您的代码在失败,testSingleThreadReadParallel
而不是并行发生。问题出在别处-可能将整个文件收集为字符串。
Files.lines
被缓冲(查看实现),因此读取文件很可能不会引起任何问题。
但是,将文件收集到一个文件中String
显然将需要大量内存,这远大于文件本身的大小。
实际上,按照我的理解,并行读取这些文件将需要更多的内存,而不是顺序的。每个线程将并行读取它在内存中的卡盘,因此您的并行方法将需要更多的内存。更重要的是,我的意思是您的CPU * BufferSize的数量Stream.lines
。
编辑2
花了一些时间后,我意识到您的问题必须存在于其他地方。就像您的文件中实际上有行吗?或者,你是在极限-我的意思是并行会增加内存确实,但与那个多。可能是你需要增加你-Xms
并-Xmx
通过只是一点点。
例如,出于测试目的,我创建了一个包含247MB
虚拟数据的文件,并在其上运行了以下代码:
Path p = Paths.get("/private/tmp/myfile.txt");
Stream<String> s = Files.lines(p).parallel(); // and without parallel
s.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
我使用的设置是-Xmx200m -Xms200m
两个parallel
和sequential
处理。这小于实际的文件大小。仍然可以正常工作。
您的主要问题是您将所有内容收集到单个String中,从而使其变得巨大。在jdk-8下将所有内容收集到我的机器上的String 至少需要1.5GB的堆。
这里也很好读
归档时间: |
|
查看次数: |
925 次 |
最近记录: |