Vij*_*jay 2 scala join dataframe apache-spark apache-spark-sql
我有两个数据帧 df1(员工表)和 df2(部门表),其架构如下:
df1.columns
// Arrays(id,name,dept_id)
Run Code Online (Sandbox Code Playgroud)
和
df2.columns
// Array(id,name)
Run Code Online (Sandbox Code Playgroud)
在我将这两个表加入 df1.dept_id 和 df2.id 之后:
val joinedData = df1.join(df2,df1("dept_id")===df2("id"))
joinedData.columns
// Array(id,name,dept_id,id,name)
Run Code Online (Sandbox Code Playgroud)
在将其保存在文件中时,
joined.write.csv("<path>")
Run Code Online (Sandbox Code Playgroud)
它给出了错误:
org.apache.spark.sql.AnalysisException: Duplicate column(s) : "name", "id" found, cannot save to file.;
Run Code Online (Sandbox Code Playgroud)
我读过使用字符串序列来避免列重复,但这是针对要执行连接的列。我需要非连接列的类似功能。
有没有直接的方法可以将表名嵌入到重复列中以便保存?
我想出了一个匹配 dfs 列并重命名重复列以将表名附加到列名的解决方案。但是有直接的方法吗?
注意:这将是一个通用代码,仅包含执行连接的列详细信息。其余列仅在运行时已知。所以我们不能通过硬编码来重命名列。
我会通过确保它们具有不同的名称来保留所有列,例如通过在列名前添加一个标识符:
val df1Cols = df1.columns
val df2Cols = df2.columns
// prefixes to column names
val df1pf = df1.select(df1Cols.map(n => col(n).as("df1_"+n)):_*)
val df2pf = df2.select(df2Cols.map(n => col(n).as("df2_"+n)):_*)
df1pf.join(df2pf,
$"df1_dept_id"===$"df2_id",
)
Run Code Online (Sandbox Code Playgroud)