如何写入Apache Beam中的多个文件?

abc*_*987 5 google-cloud-dataflow apache-beam

让我简化一下我的案子.我正在使用Apache Beam 0.6.0.我的最终处理结果是PCollection<KV<String, String>>.我想将值写入与其键对应的不同文件.

例如,假设结果包含

(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)
Run Code Online (Sandbox Code Playgroud)

然后,我想写value1,value3value4key1.txt,并写入value4key2.txt.

在我的情况下:

  • 密钥集是在管道运行时确定的,而不是在构建管道时确定的.
  • 密钥集可以非常小,但是对应于每个密钥的值的数量可能非常大.

有任何想法吗?

Cas*_*alT 5

小心翼翼地,我前几天写了一个这个案例的样本.

此示例是数据流1.x样式

基本上,您按每个键进行分组,然后您可以使用连接到云存储的自定义转换来执行此操作.需要注意的是,每个文件的行列表不应该很大(它必须适合单个实例的内存,但考虑到你可以运行高内存实例,这个限制非常高).

...
PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
            .apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
    readyToWrite.apply(
            new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
...
Run Code Online (Sandbox Code Playgroud)

然后,完成大部分工作的转换是:

public class PTransformWriteToGCS
    extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {

private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);

private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();

private final String bucketName;

private final SerializableFunction<String, String> pathCreator;

public PTransformWriteToGCS(final String bucketName,
        final SerializableFunction<String, String> pathCreator) {
    this.bucketName = bucketName;
    this.pathCreator = pathCreator;

}

@Override
public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {

    return input
            .apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {

                @Override
                public void processElement(
                        final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
                        throws Exception {
                    final String key = arg0.element().getKey();
                    final List<String> values = arg0.element().getValue();
                    final String toWrite = values.stream().collect(Collectors.joining("\n"));
                    final String path = pathCreator.apply(key);
                    BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
                            .setContentType(MimeTypes.TEXT)
                            .build();
                    LOG.info("blob writing to: {}", blobInfo);
                    Blob result = STORAGE.create(blobInfo,
                            toWrite.getBytes(StandardCharsets.UTF_8));
                }
            }));
}
Run Code Online (Sandbox Code Playgroud)

}