如何在java中使用数据流文本io动态目的地

Luk*_*Feo 5 scala google-cloud-dataflow apache-beam

您好,我对动态文件目标 api 感到非常困惑,并且没有文档,所以我在这里。

情况是我有一个 PCollection,它包含属于不同分区的事件。我想将它们分开并将它们写入gcs中的不同文件夹。

这是我所拥有的。

动态目标对象:

  class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {

    override def getDestination(element: Event): String = {
      element.partition //this returns a string which is a gcs folder path
    }

    override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
      println(destination)
      val overallPrefix = s"$prefix/$destination/part-"
      DefaultFilenamePolicy.fromStandardParameters(
        ValueProvider.StaticValueProvider.of(
          FileSystems.matchNewResource(overallPrefix, false)),
        null, ".jsonl", true)
    }

    override def formatRecord(record: Event): String = {
      implicit val f = DefaultFormats
      write(record.toDataLakeFormat())
    }

    override def getDefaultDestination: String = "default"
  }
Run Code Online (Sandbox Code Playgroud)

我相信这是正确的逻辑,我询问每个元素的目标分区是什么,然后将其传递到 getFileNamePolicy 并从那里构建文件名。要格式化记录,我只是将其转换为 json。

问题是将它与 TextIO 集成,我试过这个

TextIO.
  write()
  .withWindowedWrites()
  .withTempDirectory(tempDir)
  .to(new GCSDestinationString("gcs://bucket"))
Run Code Online (Sandbox Code Playgroud)

但它要求源类型为字符串,从技术上讲这可以工作,但我必须多次反序列化。我在文本 io 动态目的地的文档中找到了

Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.

所以让我们试试看

  TextIO
    .writeCustomType[Event]()
    .withWindowedWrites()
    .withTempDirectory(tempDir)
    .to(new GCSDestinationString("gcs://bucket"))
Run Code Online (Sandbox Code Playgroud)

这仍然不能编译为 writeCustomType 内部返回TypedWrite<UserT, Void>,这会产生要求我的动态目标对象的第二个类型参数为 Void 的影响。显然我要求它是一个字符串或者至少是 Void 以外的东西

我显然错过了一些东西

Luk*_*Feo 1

它似乎无法在 scala 中编译,但在深入研究后我能够使用类似的 api 获得我想要的行为

 var outputTransform =
        TextIO.
          writeCustomType[T]()
          .withFormatFunction(outputFormatter)
          .withNumShards(shards)
          .withTempDirectory(tempDir)
          .withCompression(compression)

      if (windowedWrites) {
        outputTransform = outputTransform.withWindowedWrites()
      }

      outputTransform.to(outputFileNamePolicyMapping, emptryDestination)
Run Code Online (Sandbox Code Playgroud)

其中输出格式化程序从 T 到字符串,outputFileNamePolicyMapping 从 T 到 DefaultFilenamePolicy.Params