IYY*_*IYY 1 google-cloud-dataflow apache-beam
由于文档仅适用于JAVA,我无法理解其含义.
它声明 - "虽然ParDo总是产生一个主输出PCollection(作为应用的返回值),你也可以让你的ParDo产生任意数量的额外输出PCollections.如果你选择有多个输出,你的ParDo将返回所有的输出PCollections(包括主输出)捆绑在一起.例如,在Java中,输出PCollections捆绑在一个类型安全的PCollectionTuple中."
我理解捆绑在一起意味着什么,但如果我在我的DoFn中产生一个标签,它是否会产生一个包含所有其他输出的空包,并在代码中遇到它们时产生其他输出?或者它等待所有产量准备好输入并将它们全部输出到一起?
文档中没有太多清晰度.虽然我认为它不会等待,只是遇到收益,但我仍然需要了解发生了什么.
回答这个问题的最佳方法是举个例子.这个例子在Beam中可用.
假设您要运行字数统计管道(例如,计算每个单词在文档中出现的次数).为此,您需要将文件中的行拆分为单个单词.考虑一下,您还需要单独计算字长.你的分裂变换是这样的:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input) # Read in the file
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
split_lines_result = (lines
| beam.ParDo(SplitLinesToWordsFn()).with_outputs(
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
main='words'))
short_words = split_lines_result['words']
character_count = split_lines_result[
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]
Run Code Online (Sandbox Code Playgroud)
在这种情况下,每个都是不同的PCollection,具有正确的元素.在DoFn将负责分裂其输出,并且它由元素标记做的.看到:
class SplitLinesToWordsFn(beam.DoFn):
OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
def process(self, element):
# yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
# collection.
yield pvalue.TaggedOutput(
self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
# yield word to add it to the main collection.
yield word
Run Code Online (Sandbox Code Playgroud)
如您所见,对于主输出,您不需要标记元素,而是需要标记其他输出.
| 归档时间: |
|
| 查看次数: |
1796 次 |
| 最近记录: |