jav*_*dba 2 scala apache-spark apache-spark-sql
对于包含字符串和数字数据类型混合的数据框,目标是创建features
一个minhash
包含所有这些类型的新列。
尽管这可以通过执行dataframe.toRDD
来完成,但是下一步要做的只是简单地将其转换RDD
回数据帧,这样做是很昂贵的。
因此,有一种方法可以执行udf
以下操作:
val wholeRowUdf = udf( (row: Row) => computeHash(row))
Run Code Online (Sandbox Code Playgroud)
Row
spark sql
当然不是数据类型-因此这将无法正常显示。
更新/说明 我意识到创建一个在其中运行的全行UDF很容易withColumn
。不清楚的是可以在spark sql
语句内使用什么:
val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features
from mytable")
Run Code Online (Sandbox Code Playgroud)
当然,Row不是spark sql数据类型-因此,它将无法如图所示工作。
我将展示您可以使用struct内置函数使用Row将所有列或选定的列传递给udf函数
首先我定义一个 dataframe
val df = Seq(
("a", "b", "c"),
("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
// +----+----+----+
// |col1|col2|col3|
// +----+----+----+
// |a |b |c |
// |a1 |b1 |c1 |
// +----+----+----+
Run Code Online (Sandbox Code Playgroud)
然后,我定义一个函数,使一行中的所有元素成为一个字符串,并以一个字符串分隔,
(因为您具有computeHash函数)
import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")
Run Code Online (Sandbox Code Playgroud)
然后我在udf
功能中使用它
import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))
Run Code Online (Sandbox Code Playgroud)
最后,我udf
使用withColumn
函数和struct
内置函数调用函数,将选定的列合并为一列并传递给udf
函数
df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
// +----+----+----+-------------+
// |col1|col2|col3|contcatenated|
// +----+----+----+-------------+
// |a |b |c |a, b, c |
// |a1 |b1 |c1 |a1, b1, c1 |
// +----+----+----+-------------+
Run Code Online (Sandbox Code Playgroud)
因此,您可以看到Row可用于传递整行作为参数
您甚至可以一次传递所有列
val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))
Run Code Online (Sandbox Code Playgroud)
更新
您也可以使用sql查询实现相同的功能,只需将udf函数注册为
df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")
Run Code Online (Sandbox Code Playgroud)
它会为您提供与上述相同的结果
现在,如果您不想对列名进行硬编码,则可以根据需要选择列名并将其设置为字符串
val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")
Run Code Online (Sandbox Code Playgroud)
我希望答案是有帮助的
我想出了一个解决方法:将列名称放入任何现有spark sql
函数中以生成新的输出列:
concat(${df.columns.tail.mkString(",'-',")}) as Features
Run Code Online (Sandbox Code Playgroud)
在这种情况下,数据框中的第一列是目标并被排除。这是这种方法的另一个优点:可以操纵实际的列列表。
这种方法避免了不必要的 RDD/数据帧重组。
归档时间: |
|
查看次数: |
4812 次 |
最近记录: |