我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.
在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sa和sb.每个Row包含name,id_sa和id_sb.我的目标是从生产映射id_sa到id_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.
让我们试着用一个例子来澄清.如果我有以下行:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)
我的目标是从生产映射a1到b2.事实上,相关的名称a1是n1,n2和n3,分别映射b1,b2和b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.
我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)
但也许我正在尝试使用错误的工具,我应该回到使用RDD.
我正在寻求对此的澄清.我在下面写两个问题:
我们有一个员工姓名表,列ID,姓名,薪水
1. Select name from employee
where sum(salary) > 1000 ;
2. Select name from employee
where substring_index(name,' ',1) = 'nishant' ;
Run Code Online (Sandbox Code Playgroud)
查询1不起作用,但查询2确实有效.根据我的开发经验,我觉得可能的解释是:
sum()适用于参数中指定的一组值.这里'salary'列被传递,因此它必须将此列的所有值相加.但是在where子句中,记录是逐个检查的,就像检查第一个记录1一样,等等.因此,sum(salary)将不会被计算,因为它需要访问所有列值,然后只返回一个值.
查询2作为substring_index()工作在单个值上,因此它在提供给它的值上工作.
你能否证实我的理解.
我在StackOverflow上发布了关于返回由另一列分组的列的最大值的帖子,并得到了一个意外的Java异常.
这是测试数据:
import pyspark.sql.functions as f
data = [('a', 5), ('a', 8), ('a', 7), ('b', 1), ('b', 3)]
df = spark.createDataFrame(data, ["A", "B"])
df.show()
+---+---+
| A| B|
+---+---+
| a| 5|
| a| 8|
| a| 7|
| b| 1|
| b| 3|
+---+---+
Run Code Online (Sandbox Code Playgroud)
以下是据称适用于其他用户的解决方案:
from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
.where(f.col('B') == f.col('maxB'))\
.drop('maxB').show()
Run Code Online (Sandbox Code Playgroud)
哪个应该产生这个输出:
#+---+---+
#| A| B|
#+---+---+
#| a| 8|
#| b| 3|
#+---+---+
Run Code Online (Sandbox Code Playgroud)
相反,我得到:
java.lang.UnsupportedOperationException: Cannot evaluate expression: max(input[2, …Run Code Online (Sandbox Code Playgroud)