ljh*_*ssy 2 java google-cloud-dataflow apache-beam apache-beam-io
我正在关注这篇文章和文档的答案,以便在管道结束时对我的数据执行动态窗口写入。这是我到目前为止所拥有的:
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(
FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withNaming(key -> defaultNaming(key, ".json")));
}
Run Code Online (Sandbox Code Playgroud)
但是 NetBeans 在最后一行警告我语法错误:
FileNaming is not public in Write; cannot be accessed outside package
如何使defaultNaming我的管道可用,以便我可以将其用于动态写入。或者,如果这不可能,我应该做什么?
发布我发现的内容,以防其他人遇到此问题。
我writeDynamic()之前尝试使用的方式存在三个问题。
FileNaming为FileIO.Write. Beam 2.4.0 定义FileNaming为public static interface使其在外部可用。defaultNaming. 而不是defaultNaming直接调用- 正如它在示例文档中所调用的那样 - 它必须被调用,FileIO.Write.defaultNaming因为它FileIO是我实际导入的包。withDestinationCoder执行动态写入也需要添加。最终的解决方案看起来像这样。
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}
Run Code Online (Sandbox Code Playgroud)
Event::getKey与签名在同一包中定义的静态函数在哪里public static String getKey(String event)。
这将执行一个窗口写入,它将为每个窗口写入一个文件(由.withNumShards(1)方法定义)。这假设窗口已在上一步中定义。GroupByKey在写入之前不需要A ,因为只要明确定义了分片数量,它就会在写入过程中完成。有关“写入文件 -> 每个窗格生成多少个分片”下的更多详细信息,请参阅FileIO 文档。
| 归档时间: |
|
| 查看次数: |
1300 次 |
| 最近记录: |