根据Spark中的另一个RDD进行过滤

poi*_*rez 13 python scala apache-spark

我想只保留在第二个表中引用了部门ID的员工.

Employee table
LastName    DepartmentID
Rafferty    31
Jones   33
Heisenberg  33
Robinson    34
Smith   34

Department table
DepartmentID
31  
33  
Run Code Online (Sandbox Code Playgroud)

我尝试了以下不起作用的代码:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()

Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?我正在使用Spark 1.1.0和Python.但是,我会接受Scala或Python的答案.

maa*_*asg 23

在这种情况下,您希望实现的是使用department表中包含的数据过滤每个分区:这将是基本的解决方案:

val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}
Run Code Online (Sandbox Code Playgroud)

如果您的部门数据很大,广播变量将通过向所有节点提供一次数据来提高性能,而不必为每个任务序列化数据

val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}
Run Code Online (Sandbox Code Playgroud)

虽然使用join会起作用,但它是一个非常昂贵的解决方案,因为它需要分布式数据的shuffle(byKey)来实现连接.鉴于需求是一个简单的过滤器,将数据发送到每个分区(如上所示)将提供更好的性能.


poi*_*rez 10

我终于使用连接实现了一个解决方案.我不得不向部门添加0值以避免来自Spark的异常:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
# invert id and name to get id as the key
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0]))
# add a 0 value to avoid an exception
department = sc.parallelize(department).map(lambda d: (d,0))

employee.join(department).map(lambda e: (e[1][0], e[0])).collect()

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)]
Run Code Online (Sandbox Code Playgroud)