Ale*_*x R 11 java java-8 java-stream spliterator
我想使用a Stream并行处理未知数量(文件数量在预先未知的情况下)的异类远程存储JSON文件集。文件的大小可以相差很大,从每个文件1个JSON记录到其他文件中的100,000个记录。甲JSON记录在这种情况下是指表示为文件中的一条线的自包含JSON对象。
我真的很想为此使用Streams,所以我实现了这一点Spliterator:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是,虽然Stream一开始很漂亮地进行了并行化,但是最终最大的文件仍留在单个线程中进行处理。我认为,近端原因已得到充分证明:分离器“不平衡”。
更具体地说,似乎trySplit在Stream.forEach生命周期中的某个点之后没有调用该方法,因此trySplit很少执行在末尾分发小批量的额外逻辑。
请注意,从trySplit返回的所有拆分paths器如何共享同一个迭代器。我认为这是在所有拆分器之间平衡工作的非常聪明的方法,但还不足以实现完全并行。
我希望并行处理首先在文件之间进行,然后在仍然拆分几个大文件时,我希望在剩余文件的大块之间进行并行处理。这就是else区块末尾的意图trySplit。
是否有解决此问题的简单/简单/规范方法?
经过多次实验,我仍然无法通过调整大小估计来获得任何额外的并行性。基本上,除了 之外的任何值Long.MAX_VALUE都会导致 spliterator 过早终止(并且没有任何分裂),而另一方面,估计Long.MAX_VALUE将导致trySplit被无情地调用,直到它返回null。
我找到的解决方案是在分裂者之间内部共享资源,并让他们在彼此之间重新平衡。
工作代码:
public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {
public final static class AwsS3LineInput<LINE> {
final public S3ObjectSummary s3ObjectSummary;
final public LINE lineItem;
public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
this.s3ObjectSummary = s3ObjectSummary;
this.lineItem = lineItem;
}
}
private final class InputStreamHandler {
final S3ObjectSummary file;
final InputStream inputStream;
InputStreamHandler(S3ObjectSummary file, InputStream is) {
this.file = file;
this.inputStream = is;
}
}
private final Iterator<S3ObjectSummary> incomingFiles;
private final Function<S3ObjectSummary, InputStream> fileOpener;
private final Function<InputStream, LINE> lineReader;
private final Deque<S3ObjectSummary> unopenedFiles;
private final Deque<InputStreamHandler> openedFiles;
private final Deque<AwsS3LineInput<LINE>> sharedBuffer;
private final int maxBuffer;
private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
Function<InputStream, LINE> lineReader,
Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
int maxBuffer) {
super(Long.MAX_VALUE, 0);
this.incomingFiles = incomingFiles;
this.fileOpener = fileOpener;
this.lineReader = lineReader;
this.unopenedFiles = unopenedFiles;
this.openedFiles = openedFiles;
this.sharedBuffer = sharedBuffer;
this.maxBuffer = maxBuffer;
}
public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
}
@Override
public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
AwsS3LineInput<LINE> lineInput;
synchronized(sharedBuffer) {
lineInput=sharedBuffer.poll();
}
if(lineInput != null) {
action.accept(lineInput);
return true;
}
InputStreamHandler handle = openedFiles.poll();
if(handle == null) {
S3ObjectSummary unopenedFile = unopenedFiles.poll();
if(unopenedFile == null) {
return false;
}
handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
}
for(int i=0; i < maxBuffer; ++i) {
LINE line = lineReader.apply(handle.inputStream);
if(line != null) {
synchronized(sharedBuffer) {
sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
}
}
else {
return tryAdvance(action);
}
}
openedFiles.addFirst(handle);
return tryAdvance(action);
}
@Override
public Spliterator<AwsS3LineInput<LINE>> trySplit() {
synchronized(incomingFiles) {
if (incomingFiles.hasNext()) {
unopenedFiles.add(incomingFiles.next());
return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
} else {
return null;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)