检查 PCollection 是否为空 - Apache Beam

ris*_*097 1 google-cloud-dataflow apache-beam

有没有办法检查 PCollection 是否为空?

我在 Dataflow 和 Apache Beam 的文档中没有找到任何相关内容。

Mar*_*cki 6

你没有指定你正在使用哪个 SDK,所以我假设是 Python。该代码很容易移植到 Java。

您可以应用元素的全局计数,然后通过应用简单比较将数值映射到布尔值。您将能够使用pvalue.AsSingleton函数侧面输入该值,如下所示:

import apache_beam as beam
from apache_beam import pvalue

is_empty_check = (your_pcollection
                    | "Count" >> beam.combiners.Count.Globally()
                    | "Is empty?" >> beam.Map(lambda n: n == 0)
                    )

another_pipeline_branch = (
    p
    | beam.Map(do_something, is_empty=pvalue.AsSingleton(is_empty_check))
)
Run Code Online (Sandbox Code Playgroud)

侧面输入的用法如下:

def do_something(element, is_empty):
    if is_empty:
        # yes
    else:
        # no
Run Code Online (Sandbox Code Playgroud)