我试图找出 Map 和 ParDo 之间的性能差异,但我无法以某种方式运行 ParDo 方法
我已经尝试寻找一些尝试解决问题的资源,但我没有找到
ParDo 方法(这不起作用):
class ci(beam.DoFn):
def compute_interest(self,data_item):
cust_id, cust_data = data_item
if(cust_data['basic'][0]['acc_opened_date']=='2010-10-10'):
new_data = {}
new_data['new_balance'] = (cust_data['account'][0]['cur_bal'] * cust_data['account'][0]['roi']) / 100
new_data.update(cust_data['account'][0])
new_data.update(cust_data['basic'][0])
del new_data['cur_bal']
return new_data
Run Code Online (Sandbox Code Playgroud)
地图方法(这有效):
def compute_interest(data_item):
cust_id, cust_data = data_item
if(cust_data['basic'][0]['acc_opened_date']=='2010-10-10'):
new_data = {}
new_data['new_balance'] = (cust_data['account'][0]['cur_bal'] * cust_data['account'][0]['roi']) / 100
new_data.update(cust_data['account'][0])
new_data.update(cust_data['basic'][0])
del new_data['cur_bal']
return new_data
Run Code Online (Sandbox Code Playgroud)
错误:
引发 NotImplementedError RuntimeError: NotImplementedError [运行 'PIPELINE NAME']
Beam.DoFn期待一个process方法:
def process(self, element):
Run Code Online (Sandbox Code Playgroud)
如Beam 编程指南的第 4.2.1.2 节所述:
在 DoFn 子类中,您将编写一个方法流程,在其中提供实际的处理逻辑。您不需要手动从输入集合中提取元素;Beam SDK 会为您处理这些问题。您的流程方法应该接受元素类型的对象。这是输入元素,输出是通过在 process 方法中使用 yield 或 return 语句发出的。
作为示例,我们将同时定义Map和ParDo函数:
def compute_interest_map(data_item):
return data_item + 1
class compute_interest_pardo(beam.DoFn):
def process(self, element):
yield element + 2
Run Code Online (Sandbox Code Playgroud)
如果您更改process另一个方法名称,您将获得NotImplementedError.
主要管道将是:
events = (p
| 'Create' >> beam.Create([1, 2, 3]) \
| 'Add 1' >> beam.Map(lambda x: compute_interest_map(x)) \
| 'Add 2' >> beam.ParDo(compute_interest_pardo()) \
| 'Print' >> beam.ParDo(log_results()))
Run Code Online (Sandbox Code Playgroud)
输出:
INFO:root:>> Interest: 4
INFO:root:>> Interest: 5
INFO:root:>> Interest: 6
Run Code Online (Sandbox Code Playgroud)