小编Anu*_*j J的帖子

有没有办法用 Apache Beam FileIO 为每条记录写一个文件?

我正在学习 Apache Beam 并尝试实现类似于 distcp 的东西。我使用 FileIO.read().filepattern() 来获取输入文件,但是在使用 FileIO.write 写入时,文件有时会合并。

在作业执行之前知道分区计数是不可能的。

PCollection<MatchResult.Metadata> pCollection = pipeline.apply(this.name(), FileIO.match().filepattern(path()))
  .apply(FileIO.readMatches())
  .apply(name(), FileIO.<FileIO.ReadableFile>write()
        .via(FileSink.create())
        .to(path()));
Run Code Online (Sandbox Code Playgroud)

接收器的代码

@AutoValue
public abstract static class FileSink implements FileIO.Sink<FileIO.ReadableFile> {

    private OutputStream outputStream;

    public static FileSink create() {
      return new AutoValue_FileIOOperator_FileSink();
    }

    @Override
    public void open(WritableByteChannel channel) throws IOException {
      outputStream = Channels.newOutputStream(channel);
    }

    @Override
    public void write(FileIO.ReadableFile element) throws IOException {
      try (final InputStream inputStream = Channels.newInputStream(element.open())) {
        IOUtils.copy(inputStream, outputStream);
      }
    }

    @Override
    public void flush() throws IOException …
Run Code Online (Sandbox Code Playgroud)

apache-beam apache-beam-io

1
推荐指数
1
解决办法
2971
查看次数

标签 统计

apache-beam ×1

apache-beam-io ×1