我正在学习 Apache Beam 并尝试实现类似于 distcp 的东西。我使用 FileIO.read().filepattern() 来获取输入文件,但是在使用 FileIO.write 写入时,文件有时会合并。
在作业执行之前知道分区计数是不可能的。
PCollection<MatchResult.Metadata> pCollection = pipeline.apply(this.name(), FileIO.match().filepattern(path()))
.apply(FileIO.readMatches())
.apply(name(), FileIO.<FileIO.ReadableFile>write()
.via(FileSink.create())
.to(path()));
Run Code Online (Sandbox Code Playgroud)
接收器的代码
@AutoValue
public abstract static class FileSink implements FileIO.Sink<FileIO.ReadableFile> {
private OutputStream outputStream;
public static FileSink create() {
return new AutoValue_FileIOOperator_FileSink();
}
@Override
public void open(WritableByteChannel channel) throws IOException {
outputStream = Channels.newOutputStream(channel);
}
@Override
public void write(FileIO.ReadableFile element) throws IOException {
try (final InputStream inputStream = Channels.newInputStream(element.open())) {
IOUtils.copy(inputStream, outputStream);
}
}
@Override
public void flush() throws IOException …Run Code Online (Sandbox Code Playgroud)