在Spark UDF中处理所有列/整个行

jav*_*dba 2 scala apache-spark apache-spark-sql

对于包含字符串和数字数据类型混合的数据框,目标是创建features一个minhash包含所有这些类型的新列。

尽管这可以通过执行dataframe.toRDD来完成,但是下一步要做的只是简单地将其转换RDD 数据帧,这样做是很昂贵的。

因此,有一种方法可以执行udf以下操作:

val wholeRowUdf = udf( (row: Row) =>  computeHash(row))
Run Code Online (Sandbox Code Playgroud)

Rowspark sql当然不是数据类型-因此这将无法正常显示。

更新/说明 我意识到创建一个在其中运行的全行UDF很容易withColumn。不清楚的是可以在spark sql语句内使用什么:

val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features 
                              from mytable")
Run Code Online (Sandbox Code Playgroud)

Ram*_*jan 7

当然,Row不是spark sql数据类型-因此,它将无法如图所示工作。

我将展示您可以使用struct内置函数使用Row将所有列或选定的列传递给udf函数

首先我定义一个 dataframe

val df = Seq(
  ("a", "b", "c"),
  ("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
//    +----+----+----+
//    |col1|col2|col3|
//    +----+----+----+
//    |a   |b   |c   |
//    |a1  |b1  |c1  |
//    +----+----+----+
Run Code Online (Sandbox Code Playgroud)

然后,我定义一个函数,使一行中的所有元素成为一个字符串,并以一个字符串分隔,(因为您具有computeHash函数)

import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")
Run Code Online (Sandbox Code Playgroud)

然后我在udf功能中使用它

import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))
Run Code Online (Sandbox Code Playgroud)

最后,我udf使用withColumn函数和struct 内置函数调用函数,将选定的列合并为一列并传递给udf函数

df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
//    +----+----+----+-------------+
//    |col1|col2|col3|contcatenated|
//    +----+----+----+-------------+
//    |a   |b   |c   |a, b, c      |
//    |a1  |b1  |c1  |a1, b1, c1   |
//    +----+----+----+-------------+
Run Code Online (Sandbox Code Playgroud)

因此,您可以看到Row可用于传递整行作为参数

您甚至可以一次传递所有列

val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))
Run Code Online (Sandbox Code Playgroud)

更新

您也可以使用sql查询实现相同的功能,只需将udf函数注册

df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")
Run Code Online (Sandbox Code Playgroud)

它会为您提供与上述相同的结果

现在,如果您不想对列名进行硬编码,则可以根据需要选择列名并将其设置为字符串

val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")
Run Code Online (Sandbox Code Playgroud)

我希望答案是有帮助的


jav*_*dba 1

我想出了一个解决方法:将列名称放入任何现有spark sql函数中以生成新的输出列:

concat(${df.columns.tail.mkString(",'-',")}) as Features
Run Code Online (Sandbox Code Playgroud)

在这种情况下,数据框中的第一列是目标并被排除。这是这种方法的另一个优点:可以操纵实际的列列表。

这种方法避免了不必要的 RDD/数据帧重组。