ParDo中的侧输出 Apache Beam Python SDK

IYY*_*IYY 1 google-cloud-dataflow apache-beam

由于文档仅适用于JAVA,我无法理解其含义.

它声明 - "虽然ParDo总是产生一个主输出PCollection(作为应用的返回值),你也可以让你的ParDo产生任意数量的额外输出PCollections.如果你选择有多个输出,你的ParDo将返回所有的输出PCollections(包括主输出)捆绑在一起.例如,在Java中,输出PCollections捆绑在一个类型安全的PCollectionTuple中."

我理解捆绑在一起意味着什么,但如果我在我的DoFn中产生一个标签,它是否会产生一个包含所有其他输出的空包,并在代码中遇到它们时产生其他输出?或者它等待所有产量准备好输入并将它们全部输出到一起?

文档中没有太多清晰度.虽然我认为它不会等待,只是遇到收益,但我仍然需要了解发生了什么.

Pab*_*blo 6

回答这个问题的最佳方法是举个例子.这个例子在Beam中可用.

假设您要运行字数统计管道(例如,计算每个单词在文档中出现的次数).为此,您需要将文件中的行拆分为单个单词.考虑一下,您还需要单独计算字长.你的分裂变换是这样的:

with beam.Pipeline(options=pipeline_options) as p:

    lines = p | ReadFromText(known_args.input)  # Read in the file

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    split_lines_result = (lines
                          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
                              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
                              main='words'))

    short_words = split_lines_result['words']
    character_count = split_lines_result[
        SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]
Run Code Online (Sandbox Code Playgroud)

在这种情况下,每个都是不同的PCollection,具有正确的元素.在DoFn将负责分裂其输出,并且它由元素标记做的.看到:

class SplitLinesToWordsFn(beam.DoFn):
  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'

  def process(self, element):
    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
    # collection.
    yield pvalue.TaggedOutput(
        self.OUTPUT_TAG_CHARACTER_COUNT, len(element))

    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      # yield word to add it to the main collection.
      yield word
Run Code Online (Sandbox Code Playgroud)

如您所见,对于主输出,您不需要标记元素,而是需要标记其他输出.