Raj*_*ita 22 java scala spark-dataframe
val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5))))
val df1= rdd.toDF("id", "vals")
val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt")))
val df2 = rdd1.toDF("id", "state")
val df3 = df1.join(df2,df1("id")===df2("id"),"left")
Run Code Online (Sandbox Code Playgroud)
连接操作工作正常,但是当我重用df2时,我面临未解决的属性错误
val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y")))
val df4 = rdd2.toDF("id","existance")
val df5 = df4.join(df2,df4("id")===df2("id"),"left")
Run Code Online (Sandbox Code Playgroud)
错误:org.apache.spark.sql.AnalysisException:已解析的属性ID#426
Eri*_*low 21
正如我在评论中提及,它关系到https://issues.apache.org/jira/browse/SPARK-10925,更具体地说https://issues.apache.org/jira/browse/SPARK-14948.重复使用引用会在命名时产生歧义,因此您必须克隆df - 有关示例,请参阅https://issues.apache.org/jira/browse/SPARK-14948中的最后一条注释.
小智 8
这个问题真的浪费了我很多时间,我终于找到了一个简单的解决方案。
在 PySpark 中,对于有问题的列,比如说colA,我们可以简单地使用
import pyspark.sql.functions as F
df = df.select(F.col("colA").alias("colA"))
Run Code Online (Sandbox Code Playgroud)
之前,使用df在join。
我认为这也适用于 Scala/Java Spark。
小智 6
只需重命名您的列并输入相同的名称。在 pyspark 中:对于 df.columns 中的 i:df = df.withColumnRenamed(i,i)
如果您有df1和从df1派生的df2,请尝试重命名df2中的所有列,以便连接后没有两个列具有相同的名称。所以在加入之前:
所以代替 df1.join(df2...
做
# Step 1 rename shared column names in df2.
df2_renamed = df2.withColumnRenamed('columna', 'column_a_renamed').withColumnRenamed('columnb', 'column_b_renamed')
# Step 2 do the join on the renamed df2 such that no two columns have same name.
df1.join(df2_renamed)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
25071 次 |
| 最近记录: |