Rav*_*ila 4 google-cloud-dataflow apache-beam
在分布式处理环境中,通常使用"部分"文件名,例如"part-000",是否可以编写某种扩展名来重命名各个输出文件名(例如每个窗口文件名) Apache Beam?
为此,可能必须能够为窗口指定名称或根据窗口的内容推断文件名.我想知道这种方法是否可行.
至于解决方案应该是流式还是批量式,流式模式示例是优选的
是的,正如jkff建议的那样,您可以使用TextIO.write.to(FilenamePolicy)来实现这一点.
示例如下:
如果要将输出写入特定的本地文件,可以使用:
lines.apply(TextIO.write()到( "/路径/到/ file.txt的"));
下面是使用前缀link来编写输出的简单方法.此示例适用于Google存储,而不是使用本地/ s3路径.
public class MinimalWordCountJava8 {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
// In order to run your pipeline, you need to make following runner specific changes:
//
// CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
// or FlinkRunner.
// CHANGE 2/3: Specify runner-required options.
// For BlockingDataflowRunner, set project and temp location as follows:
// DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
// dataflowOptions.setRunner(BlockingDataflowRunner.class);
// dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
// For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
// for more details.
// options.as(FlinkPipelineOptions.class)
// .setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
// CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
.apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
p.run().waitUntilFinish();
}
}
Run Code Online (Sandbox Code Playgroud)
此示例代码将为您提供更多控制写入输出:
/**
* A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
* being written. This always includes the shard number and the total number of shards. For
* windowed writes, it also includes the window and pane index (a sequence number assigned to each
* trigger firing).
*/
protected static class PerWindowFiles extends FilenamePolicy {
private final ResourceId prefix;
public PerWindowFiles(ResourceId prefix) {
this.prefix = prefix;
}
public String filenamePrefixForWindow(IntervalWindow window) {
String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
return String.format(
"%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
}
@Override
public ResourceId windowedFilename(int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
OutputFileHints outputFileHints) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filename =
String.format(
"%s-%s-of-%s%s",
filenamePrefixForWindow(intervalWindow),
shardNumber,
numShards,
outputFileHints.getSuggestedFilenameSuffix());
return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Unsupported.");
}
}
@Override
public PDone expand(PCollection<InputT> teamAndScore) {
if (windowed) {
teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix));
} else {
teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(TextIO.write().to(filenamePrefix));
}
return PDone.in(teamAndScore.getPipeline());
}
Run Code Online (Sandbox Code Playgroud)
是的。根据TextIO 的文档:
如果您希望比默认策略允许更好地控制文件名的生成方式,还可以使用 TextIO.Write.to(FilenamePolicy) 设置自定义 FilenamePolicy
| 归档时间: |
|
| 查看次数: |
1565 次 |
| 最近记录: |