如何在 Pyspark 中按列连接/附加多个 Spark 数据帧?

Geo*_*eRF 5 python apache-spark apache-spark-sql pyspark pyspark-sql

如何使用 Pyspark 数据框做相当于 pd.concat([df1,df2],axis='columns') 的 Pandas?我用谷歌搜索并找不到一个好的解决方案。

DF1
var1        
     3      
     4      
     5      

DF2
var2    var3     
  23      31
  44      45
  52      53

Expected output dataframe
var1        var2    var3
     3        23      31
     4        44      45
     5        52      53
Run Code Online (Sandbox Code Playgroud)

编辑以包括预期的输出

Kub*_*tun 10

我花了几个小时使用 PySpark 来完成此操作,我的工作解决方案如下;(顺便说一句,Python 相当于 @Shankar Koirala 的答案)

from pyspark.sql.functions import monotonically_increasing_id

DF1 = df2.withColumn("row_id", monotonically_increasing_id())
DF2 = df3.withColumn("row_id", monotonically_increasing_id())
result_df = DF1.join(DF2, ("row_id")).drop("row_id")
Run Code Online (Sandbox Code Playgroud)

您只需为两个数据帧定义一个公共列,并在合并后立即删除该列。我希望这个解决方案在数据帧不包含任何公共列的情况下有所帮助。

但是,此方法随机连接数据帧行,这是一个需要记住的细节。


Dev*_*evi 5

等效的接受答案使用pyspark将是

from pyspark.sql.types import StructType

spark = SparkSession.builder().master("local").getOrCreate()
df1 = spark.sparkContext.parallelize([(1, "a"),(2, "b"),(3, "c")]).toDF(["id", "name"])
df2 = spark.sparkContext.parallelize([(7, "x"),(8, "y"),(9, "z")]).toDF(["age", "address"])

schema = StructType(df1.schema.fields + df2.schema.fields)
df1df2 = df1.rdd.zip(df2.rdd).map(lambda x: x[0]+x[1])
spark.createDataFrame(df1df2, schema).show()
Run Code Online (Sandbox Code Playgroud)

  • 我遇到了同样的问题,@Devi 的解决方案效果很好。然而,我觉得很奇怪,为什么这样的基本操作在 PySpark 中会如此困难,而不是直接实现。 (2认同)

Sha*_*ala 4

下面是您想要做的示例,但在 scala 中,我希望您可以将其转换为 pyspark

val spark = SparkSession
    .builder()
    .master("local")
    .appName("ParquetAppendMode")
    .getOrCreate()
  import spark.implicits._

  val df1 = spark.sparkContext.parallelize(Seq(
    (1, "abc"),
    (2, "def"),
    (3, "hij")
  )).toDF("id", "name")

  val df2 = spark.sparkContext.parallelize(Seq(
    (19, "x"),
    (29, "y"),
    (39, "z")
  )).toDF("age", "address")

  val schema = StructType(df1.schema.fields ++ df2.schema.fields)

  val df1df2 = df1.rdd.zip(df2.rdd).map{
    case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}

  spark.createDataFrame(df1df2, schema).show()
Run Code Online (Sandbox Code Playgroud)

这就是您仅使用数据框的方式

import org.apache.spark.sql.functions._

val ddf1 = df1.withColumn("row_id", monotonically_increasing_id())
val ddf2 = df2.withColumn("row_id", monotonically_increasing_id())

val result = ddf1.join(ddf2, Seq("row_id")).drop("row_id")

result.show()
Run Code Online (Sandbox Code Playgroud)

添加新列 asrow_id并使用键 as 连接两个数据框row_id

希望这可以帮助!

  • 请注意,基于“monotonically_increasing_id”的加入并不是一个好主意,因为它不能保证连续的数字,请参见此处的示例:/sf/ask/3103561541/ -加入两个 pyspark-dataframes-having-no (3认同)