Han*_*eng 5 python google-cloud-dataflow apache-beam apache-beam-io
请参阅下面的代码片段,我想["metric1", "metric2"]成为 RunTask.process 的输入。但是,它分别使用“metric1”和“metric2”运行了两次
def run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
root = p | 'Get source' >> beam.Create([
"source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
])
metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]
metric3 = (metric1, metric2) | beam.Flatten() | beam.ParDo(RunTask()) # I want ["metric1", "metric2"] to be my input for RunTask.process. However it was run twice with "metric1" and "metric2" respectively
Run Code Online (Sandbox Code Playgroud)
Ale*_*aes 10
我知道您希望以遵循以下语法的方式加入两个 PCollection:['element1','element2']。为了实现这一点,您可以使用CoGroupByKey()而不是Flatten()。
考虑到您的代码片段,语法将:
def run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
root = p | 'Get source' >> beam.Create([
"source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
])
metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]
metric3 = (
(metric1, metric2)
| beam.CoGroupByKey()
| beam.ParDo(RunTask())
)
Run Code Online (Sandbox Code Playgroud)
我想指出 Flatten() 和 CoGroupByKey() 之间的区别。
1)Flatten()接收两个或多个存储相同数据类型的PCollection,并将它们合并为一个逻辑PCollection。例如,
import apache_beam as beam
from apache_beam import Flatten, Create, ParDo, Map
p = beam.Pipeline()
adress_list = [
('leo', 'George St. 32'),
('ralph', 'Pyrmont St. 30'),
('mary', '10th Av.'),
('carly', 'Marina Bay 1'),
]
city_list = [
('leo', 'Sydney'),
('ralph', 'Sydney'),
('mary', 'NYC'),
('carly', 'Brisbane'),
]
street = p | 'CreateEmails' >> beam.Create(adress_list)
city = p | 'CreatePhones' >> beam.Create(city_list)
resul =(
(street,city)
|beam.Flatten()
|ParDo(print)
)
p.run()
Run Code Online (Sandbox Code Playgroud)
和输出,
('leo', 'George St. 32')
('ralph', 'Pyrmont St. 30')
('mary', '10th Av.')
('carly', 'Marina Bay 1')
('leo', 'Sydney')
('ralph', 'Sydney')
('mary', 'NYC')
('carly', 'Brisbane')
Run Code Online (Sandbox Code Playgroud)
请注意,两个 PCollection 都在输出中。然而,一个附加到另一个上。
2) CoGroupByKey()在两个或多个具有相同键类型的键值 PCollection 之间执行关系连接。使用此方法,您将通过键执行连接,而不是像 Flatten() 中那样进行附加。下面是一个例子,
import apache_beam as beam
from apache_beam import Flatten, Create, ParDo, Map
p = beam.Pipeline()
address_list = [
('leo', 'George St. 32'),
('ralph', 'Pyrmont St. 30'),
('mary', '10th Av.'),
('carly', 'Marina Bay 1'),
]
city_list = [
('leo', 'Sydney'),
('ralph', 'Sydney'),
('mary', 'NYC'),
('carly', 'Brisbane'),
]
street = p | 'CreateEmails' >> beam.Create(address_list)
city = p | 'CreatePhones' >> beam.Create(city_list)
results = (
(street, city)
| beam.CoGroupByKey()
|ParDo(print)
#| beam.io.WriteToText('delete.txt')
)
p.run()
Run Code Online (Sandbox Code Playgroud)
和输出,
('leo', (['George St. 32'], ['Sydney']))
('ralph', (['Pyrmont St. 30'], ['Sydney']))
('mary', (['10th Av.'], ['NYC']))
('carly', (['Marina Bay 1'], ['Brisbane']))
Run Code Online (Sandbox Code Playgroud)
请注意,您需要主键才能连接结果。此外,此输出正是您所期望的情况。