为什么 Map 有效而 ParDo 无效?

0 python apache-beam

我试图找出 Map 和 ParDo 之间的性能差异,但我无法以某种方式运行 ParDo 方法

我已经尝试寻找一些尝试解决问题的资源,但我没有找到

ParDo 方法(这不起作用):

class ci(beam.DoFn):
  def compute_interest(self,data_item):
    cust_id, cust_data = data_item
    if(cust_data['basic'][0]['acc_opened_date']=='2010-10-10'):
      new_data = {}
      new_data['new_balance'] = (cust_data['account'][0]['cur_bal'] * cust_data['account'][0]['roi']) / 100
      new_data.update(cust_data['account'][0])
      new_data.update(cust_data['basic'][0])
      del new_data['cur_bal']
      return new_data
Run Code Online (Sandbox Code Playgroud)

地图方法(这有效):

def compute_interest(data_item):
  cust_id, cust_data = data_item
  if(cust_data['basic'][0]['acc_opened_date']=='2010-10-10'):
    new_data = {}
    new_data['new_balance'] = (cust_data['account'][0]['cur_bal'] * cust_data['account'][0]['roi']) / 100
    new_data.update(cust_data['account'][0])
    new_data.update(cust_data['basic'][0])
    del new_data['cur_bal']
    return new_data
Run Code Online (Sandbox Code Playgroud)

错误:

引发 NotImplementedError RuntimeError: NotImplementedError [运行 'PIPELINE NAME']

Gui*_*ins 6

Beam.DoFn期待一个process方法:

def process(self, element):
Run Code Online (Sandbox Code Playgroud)

Beam 编程指南的第 4.2.1.2 节所述:

在 DoFn 子类中,您将编写一个方法流程,在其中提供实际的处理逻辑。您不需要手动从输入集合中提取元素;Beam SDK 会为您处理这些问题。您的流程方法应该接受元素类型的对象。这是输入元素,输出是通过在 process 方法中使用 yield 或 return 语句发出的。

作为示例,我们将同时定义MapParDo函数:

def compute_interest_map(data_item):
  return data_item + 1

class compute_interest_pardo(beam.DoFn):
  def process(self, element):
    yield element + 2
Run Code Online (Sandbox Code Playgroud)

如果您更改process另一个方法名称,您将获得NotImplementedError.

主要管道将是:

events = (p
  | 'Create' >> beam.Create([1, 2, 3]) \
  | 'Add 1' >> beam.Map(lambda x: compute_interest_map(x)) \
  | 'Add 2' >> beam.ParDo(compute_interest_pardo()) \
  | 'Print' >> beam.ParDo(log_results()))
Run Code Online (Sandbox Code Playgroud)

输出:

INFO:root:>> Interest: 4
INFO:root:>> Interest: 5
INFO:root:>> Interest: 6
Run Code Online (Sandbox Code Playgroud)

代码