使用数据流在Google Cloud Platform中加入两个json

Kou*_*dra 4 json google-cloud-dataflow apache-beam

我想从两个不同的JSON文件中找出女性员工,只选择我们感兴趣的字段并将输出写入另一个JSON.

此外,我正在尝试使用Dataflow在Google的云平台上实现它.有人可以提供任何可以实现的示例Java代码来获得结果.

员工JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}
Run Code Online (Sandbox Code Playgroud)

部门JSON

{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}
Run Code Online (Sandbox Code Playgroud)

预期的输出JSON文件应该是这样的

{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}
Run Code Online (Sandbox Code Playgroud)

Pab*_*blo 9

CoGroupByKey如果您的部门集合明显较小,则可以使用(将使用随机播放)或使用侧面输入来执行此操作.

我将在Python中为您提供代码,但您可以在Java中使用相同的管道.


通过侧输入,您将:

  1. 将您的部门PCollection转换为将dept_id映射到部门JSON字典的字典.

  2. 然后,您将员工PCollection作为主要输入,您可以使用dept_id为部门PCollection中的每个部门获取JSON.

像这样:

departments = (p | LoadDepts()
                 | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

deps_si = beam.pvalue.AsDict(departments)

employees = (p | LoadEmps())

def join_emp_dept(employee, dept_dict):
  return employee.update(dept_dict[employee['dept_id']])

joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)
Run Code Online (Sandbox Code Playgroud)

使用CoGroupByKey,您可以使用dept_id作为键来对两个集合进行分组.这将导致PCollection的键值对,其中键是dept_id,值是部门的两个可迭代,以及该部门中的员工.

departments = (p | LoadDepts()
               | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

employees = (p | LoadEmps()
               | 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))

def join_lists((k, v)):
  itertools.product(v['employees'], v['departments'])

joined_dicts = (
    {'employees': employees, 'departments': departments} 
    | beam.CoGroupByKey()    
    | beam.FlatMap(join_lists)
    | 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
    | 'filterfields'>> beam.Map(filter_fields)
)
Run Code Online (Sandbox Code Playgroud)