Luk*_*Feo 5 scala google-cloud-dataflow apache-beam
您好,我对动态文件目标 api 感到非常困惑,并且没有文档,所以我在这里。
情况是我有一个 PCollection,它包含属于不同分区的事件。我想将它们分开并将它们写入gcs中的不同文件夹。
这是我所拥有的。
动态目标对象:
class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {
override def getDestination(element: Event): String = {
element.partition //this returns a string which is a gcs folder path
}
override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
println(destination)
val overallPrefix = s"$prefix/$destination/part-"
DefaultFilenamePolicy.fromStandardParameters(
ValueProvider.StaticValueProvider.of(
FileSystems.matchNewResource(overallPrefix, false)),
null, ".jsonl", true)
}
override def formatRecord(record: Event): String = {
implicit val f = DefaultFormats
write(record.toDataLakeFormat())
}
override def getDefaultDestination: String = "default"
}
Run Code Online (Sandbox Code Playgroud)
我相信这是正确的逻辑,我询问每个元素的目标分区是什么,然后将其传递到 getFileNamePolicy 并从那里构建文件名。要格式化记录,我只是将其转换为 json。
问题是将它与 TextIO 集成,我试过这个
TextIO.
write()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
Run Code Online (Sandbox Code Playgroud)
但它要求源类型为字符串,从技术上讲这可以工作,但我必须多次反序列化。我在文本 io 动态目的地的文档中找到了
Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.
所以让我们试试看
TextIO
.writeCustomType[Event]()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
Run Code Online (Sandbox Code Playgroud)
这仍然不能编译为 writeCustomType 内部返回TypedWrite<UserT, Void>
,这会产生要求我的动态目标对象的第二个类型参数为 Void 的影响。显然我要求它是一个字符串或者至少是 Void 以外的东西
我显然错过了一些东西
它似乎无法在 scala 中编译,但在深入研究后我能够使用类似的 api 获得我想要的行为
var outputTransform =
TextIO.
writeCustomType[T]()
.withFormatFunction(outputFormatter)
.withNumShards(shards)
.withTempDirectory(tempDir)
.withCompression(compression)
if (windowedWrites) {
outputTransform = outputTransform.withWindowedWrites()
}
outputTransform.to(outputFileNamePolicyMapping, emptryDestination)
Run Code Online (Sandbox Code Playgroud)
其中输出格式化程序从 T 到字符串,outputFileNamePolicyMapping 从 T 到 DefaultFilenamePolicy.Params