Kou*_*dra 4 json google-cloud-dataflow apache-beam
我想从两个不同的JSON文件中找出女性员工,只选择我们感兴趣的字段并将输出写入另一个JSON.
此外,我正在尝试使用Dataflow在Google的云平台上实现它.有人可以提供任何可以实现的示例Java代码来获得结果.
员工JSON
{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}
Run Code Online (Sandbox Code Playgroud)
部门JSON
{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}
Run Code Online (Sandbox Code Playgroud)
预期的输出JSON文件应该是这样的
{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}
Run Code Online (Sandbox Code Playgroud)
CoGroupByKey如果您的部门集合明显较小,则可以使用(将使用随机播放)或使用侧面输入来执行此操作.
我将在Python中为您提供代码,但您可以在Java中使用相同的管道.
通过侧输入,您将:
将您的部门PCollection转换为将dept_id映射到部门JSON字典的字典.
然后,您将员工PCollection作为主要输入,您可以使用dept_id为部门PCollection中的每个部门获取JSON.
像这样:
departments = (p | LoadDepts()
| 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))
deps_si = beam.pvalue.AsDict(departments)
employees = (p | LoadEmps())
def join_emp_dept(employee, dept_dict):
return employee.update(dept_dict[employee['dept_id']])
joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)
Run Code Online (Sandbox Code Playgroud)
使用CoGroupByKey,您可以使用dept_id作为键来对两个集合进行分组.这将导致PCollection的键值对,其中键是dept_id,值是部门的两个可迭代,以及该部门中的员工.
departments = (p | LoadDepts()
| 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))
employees = (p | LoadEmps()
| 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))
def join_lists((k, v)):
itertools.product(v['employees'], v['departments'])
joined_dicts = (
{'employees': employees, 'departments': departments}
| beam.CoGroupByKey()
| beam.FlatMap(join_lists)
| 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
| 'filterfields'>> beam.Map(filter_fields)
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1512 次 |
| 最近记录: |