将列中的列从一个数据帧添加到另一个数据帧

You*_*man 2 scala dataframe apache-spark

我有两个具有相同行数的DataFrame,但列数根据来源不同而且是动态的.

第一个DataFrame包含所有列,但第二个DataFrame被过滤和处理,其中没有所有其他列.

需要从第一个DataFrame中选择特定列,并添加/合并第二个DataFrame.

val sourceDf = spark.read.load(parquetFilePath)
val resultDf = spark.read.load(resultFilePath)

val columnName :String="Col1"
Run Code Online (Sandbox Code Playgroud)

我尝试以几种方式添加,在这里我只是给了一个......

val modifiedResult = resultDf.withColumn(columnName, sourceDf.col(columnName))

val modifiedResult = resultDf.withColumn(columnName, sourceDf(columnName))
val modifiedResult = resultDf.withColumn(columnName, labelColumnUdf(sourceDf.col(columnName)))
Run Code Online (Sandbox Code Playgroud)

这些都不起作用.

你可以帮我解决这个问题,将第一个DataFrame的第二个DataFrame合并/添加到第一个DataFrame.

给出的示例不是我需要的确切数据结构,但它将满足我解决此问题的要求.

输入输出样本:

Source DataFrame:
+---+------+---+
|InputGas|
+---+------+---+
|1000|
|2000|
|3000|
|4000|
+---+------+---+

Result DataFrame:
+---+------+---+
| Time|CalcGas|Speed|
+---+------+---+
|  0 | 111| 1111|
|  0 | 222| 2222|
|  1 | 333| 3333|
|  2 | 444| 4444|
+---+------+---+

Expected Output:
+---+------+---+
|Time|CalcGas|Speed|InputGas|
+---+------+---+---+
|  0|111 | 1111 |1000|
|  0|222 | 2222 |2000|
|  1|333 | 3333 |3000|
|  2|444 | 4444 |4000|
+---+------+---+---+
Run Code Online (Sandbox Code Playgroud)

Pra*_*ode 9

一种方法来实现这一目标 join

如果您在两个数据框中都有一些公共列,那么您可以在该列上执行连接并获得您想要的结果.

例:

import sparkSession.sqlContext.implicits._

val df1 = Seq((1, "Anu"),(2, "Suresh"),(3, "Usha"), (4, "Nisha")).toDF("id","name")
val df2 = Seq((1, 23),(2, 24),(3, 24), (4, 25), (5, 30), (6, 32)).toDF("id","age")

val df = df1.as("df1").join(df2.as("df2"), df1("id") === df2("id")).select("df1.id", "df1.name", "df2.age")
df.show()
Run Code Online (Sandbox Code Playgroud)

输出:

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|   Anu| 23|
|  2|Suresh| 24|
|  3|  Usha| 24|
|  4| Nisha| 25|
+---+------+---+
Run Code Online (Sandbox Code Playgroud)

更新:

如果您在两个数据框中都没有任何唯一的ID,那么创建一个并使用它.

import sparkSession.sqlContext.implicits._
import org.apache.spark.sql.functions._

var sourceDf = Seq(1000, 2000, 3000, 4000).toDF("InputGas")
var resultDf  = Seq((0, 111, 1111), (0, 222, 2222), (1, 333, 3333), (2, 444, 4444)).toDF("Time", "CalcGas", "Speed")

sourceDf = sourceDf.withColumn("rowId1", monotonically_increasing_id())
resultDf = resultDf.withColumn("rowId2", monotonically_increasing_id())

val df = sourceDf.as("df1").join(resultDf.as("df2"), sourceDf("rowId1") === resultDf("rowId2"), "inner").select("df1.InputGas", "df2.Time", "df2.CalcGas", "df2.Speed")
df.show()
Run Code Online (Sandbox Code Playgroud)

输出:

+--------+----+-------+-----+
|InputGas|Time|CalcGas|Speed|
+--------+----+-------+-----+
|    1000|   0|    111| 1111|
|    2000|   0|    222| 2222|
|    3000|   1|    333| 3333|
|    4000|   2|    444| 4444|
+--------+----+-------+-----+
Run Code Online (Sandbox Code Playgroud)

  • 请注意,这并不总是有效(尽管对于较小的数据帧也是如此)。`monotonically_increasing_id`仅保证数字在增加,不保证使用哪个数字。因此,赋予两个数据帧的编号可以不同。 (2认同)

归档时间:

查看次数:

13528 次

最近记录:

8 年,1 月 前