使用 DirectRunner 可视化 Beam 管道运行的方法

Dmi*_*lin 3 pipeline view apache-beam direct-runner

在GCP中我们可以看到管道执行图。通过 DirectRunner 在本地运行时是否可以实现同样的效果?

小智 7

您可以使用pipeline_graph和 来InteractiveRunner在本地获取管道的 graphviz 表示。

Beam 文档中使用的字数统计管道示例:

import apache_beam as beam
from apache_beam.runners.interactive.display import pipeline_graph
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import re

pipeline = beam.Pipeline(InteractiveRunner())
lines = pipeline | beam.Create([f"some_file_{i}.txt" for i in range(10)])

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (
        beam.FlatMap(
            lambda x: re.findall(r'[A-Za-z\']+', x)).with_output_types(str))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
def format_result(word_count):
    (word, count) = word_count
    return f'{word}: {count}'

output = counts | 'Format' >> beam.Map(format_result)

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | beam.io.WriteToText("some_file.txt")

print(pipeline_graph.PipelineGraph(pipeline).get_dot())
Run Code Online (Sandbox Code Playgroud)

这打印

digraph G {
node [color=blue, fontcolor=blue, shape=box];
"Create";
lines [shape=circle];
"Split";
pcoll4978 [label="", shape=circle];
"PairWithOne";
pcoll8859 [label="", shape=circle];
"GroupAndSum";
counts [shape=circle];
"Format";
output [shape=circle];
"WriteToText";
pcoll6409 [label="", shape=circle];
"Create" -> lines;
lines -> "Split";
"Split" -> pcoll4978;
pcoll4978 -> "PairWithOne";
"PairWithOne" -> pcoll8859;
pcoll8859 -> "GroupAndSum";
"GroupAndSum" -> counts;
counts -> "Format";
"Format" -> output;
output -> "WriteToText";
"WriteToText" -> pcoll6409;
}
Run Code Online (Sandbox Code Playgroud)

将其放入https://edotor.net会产生:

梁管道

如果需要,您可以在 Python 中使用 GraphViz 来生成更漂亮的输出(例如graphviz )。