连接Apache Spark DataFrame中的列

Nip*_*pun 95 sql dataframe apache-spark apache-spark-sql

我们如何在Apache Spark DataFrame中连接两列?我们可以使用Spark SQL中的任何函数吗?

zer*_*323 150

使用原始SQL,您可以使用CONCAT:

  • 在Python中

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
    Run Code Online (Sandbox Code Playgroud)
  • 在斯卡拉

    import sqlContext.implicits._
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
    Run Code Online (Sandbox Code Playgroud)

从Spark 1.5.0开始,您可以使用concatDataFrame API的功能:

还有concat_ws一个函数,它将字符串分隔符作为第一个参数.


muo*_*uon 38

以下是您可以进行自定义命名的方法

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()
Run Code Online (Sandbox Code Playgroud)

给,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+
Run Code Online (Sandbox Code Playgroud)

通过连接创建新列:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+
Run Code Online (Sandbox Code Playgroud)

  • `lit`创建一列`_` (3认同)

Ign*_*rre 21

在Spark Scala中连接字符串列的一个选项是使用concat.

有必要检查空值.因为如果其中一列为null,则即使其他列之一确实具有信息,结果也将为null.

使用concatwithColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))
Run Code Online (Sandbox Code Playgroud)

使用concatselect:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")
Run Code Online (Sandbox Code Playgroud)

使用这两种方法,您将获得一个NEW_COLUMN,其值是列的串联:来自原始df的COL1和COL2.

  • @IgnacioAlorre 如果您使用 `concat_ws` 而不是 `concat`,则可以避免检查 NULL。 (3认同)

Ani*_*non 20

连接(*列)

v1.5 及更高版本

将多个输入列连接到一个列中。该函数适用于字符串、二进制和兼容的数组列。

例如: new_df = df.select(concat(df.a, df.b, df.c))


concat_ws(sep, *cols)

v1.5 及更高版本

类似于concat但使用指定的分隔符。

例如: new_df = df.select(concat_ws('-', df.col1, df.col2))


map_concat(*cols)

v2.4 及更高版本

用于连接映射,返回所有给定映射的并集。

例如: new_df = df.select(map_concat("map1", "map2"))


使用字符串连接运算符 ( ||):

v2.3 及更高版本

例如: df = spark.sql("select col_a || col_b || col_c as abc from table_x")

参考:Spark sql 文档


Dan*_*tha 17

如果要使用DF执行此操作,可以使用udf基于现有列添加新列.

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
Run Code Online (Sandbox Code Playgroud)


Kri*_*has 9

从Spark 2.3(SPARK-22771)起,Spark SQL支持串联运算符||

例如;

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")
Run Code Online (Sandbox Code Playgroud)


Ted*_*lay 7

这是为pyspark执行此操作的另一种方法:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit

#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])

#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))

#Show the new data frame
personDF.show()

----------RESULT-------------------------

84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+
Run Code Online (Sandbox Code Playgroud)


小智 5

如果您不知道数据框中的列数或名称,这是一个建议。

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
Run Code Online (Sandbox Code Playgroud)