从Apache Beam管道收集输出并将其显示到控制台

Sha*_*lam 5 apache-beam

我从事Apache Beam已有两天了。我想快速迭代正在使用的应用程序,并确保正在构建的管道没有错误。在spark中,我们可以使用sc.parallelise,当我们执行某些操作时,我们可以获得可以检查的值。

同样,当我阅读有关Apache Beam的内容时,我发现我们可以PCollection使用以下语法创建一个并使用它

with beam.Pipeline() as pipeline:
    lines = pipeline | beam.Create(["this is test", "this is another test"])
    word_count = (lines 
                  | "Word" >> beam.ParDo(lambda line: line.split(" "))
                  | "Pair of One" >> beam.Map(lambda w: (w, 1))
                  | "Group" >> beam.GroupByKey()
                  | "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
    result = pipeline.run()
Run Code Online (Sandbox Code Playgroud)

我实际上想将结果打印到控制台。但是我找不到关于它的任何文档。

有没有一种方法可以将结果打印到控制台,而不是每次都将其保存到文件中?

Oli*_*ver 10

您不需要临时列表。在 python 2.7 中,以下内容就足够了:

def print_row(row):
    print row

(pipeline 
    | ...
    | "print" >> beam.Map(print_row)
)

result = pipeline.run()
result.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

在 python 3.x 中,print是一个函数,所以以下就足够了:

(pipeline 
    | ...
    | "print" >> beam.Map(print)
)

result = pipeline.run()
result.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

  • 请注意,如果您尝试将其添加到管道中间,则可能会从管道中收到错误“TypeError: 'NoneType' object is not subscriptable”。这是因为 `print` 返回 `None`,它会传递给您的以下指令。在这种情况下,您将需要一些不同的代码来打印该值然后返回它。 (6认同)

Sha*_*lam 6

在进一步探索并了解如何为我的应用程序编写测试用例之后,我找到了将结果打印到控制台的方法。请注意,我现在正在将所有内容运行到单节点机器上,并试图了解 apache beam 提供的功能,以及如何在不影响行业最佳实践的情况下采用它。

所以,这是我的解决方案。在我们管道的最后阶段,我们可以引入一个 map 函数,该函数将结果打印到控制台或将结果累积到一个变量中,然后我们可以打印变量以查看值

import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
    | "split" >> beam.ParDo(lambda row: row.split(" "))
    | "pair" >> beam.Map(lambda w: (w, 1))
    | "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
    output.append(row)
    return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available
result.wait_until_finish()

# print the output
print output
Run Code Online (Sandbox Code Playgroud)

  • 好主意,但如果您的管道以分布式方式执行,例如在 Apache Yarn (Hadoop) 或 Google Dataflow 中,这将不起作用。必须有另一种方法来收集结果。但我仍在寻找它。 (7认同)