Nik*_*shi 20 scala dataframe apache-spark apache-spark-sql apache-spark-ml
我有一个DataFrame
架构
root
|-- label: string (nullable = true)
|-- features: struct (nullable = true)
| |-- feat1: string (nullable = true)
| |-- feat2: string (nullable = true)
| |-- feat3: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
同时,我能够使用过滤数据框
val data = rawData
.filter( !(rawData("features.feat1") <=> "100") )
Run Code Online (Sandbox Code Playgroud)
我无法删除列
val data = rawData
.drop("features.feat1")
Run Code Online (Sandbox Code Playgroud)
这是我在这里做错了吗?我也试过(不成功)做drop(rawData("features.feat1"))
,虽然这样做没有多大意义.
提前致谢,
尼基尔
zer*_*323 23
这只是一个编程练习,但你可以尝试这样的事情:
import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.{functions => f}
import scala.util.Try
case class DFWithDropFrom(df: DataFrame) {
def getSourceField(source: String): Try[StructField] = {
Try(df.schema.fields.filter(_.name == source).head)
}
def getType(sourceField: StructField): Try[StructType] = {
Try(sourceField.dataType.asInstanceOf[StructType])
}
def genOutputCol(names: Array[String], source: String): Column = {
f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)
}
def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
getSourceField(source)
.flatMap(getType)
.map(_.fieldNames.diff(toDrop))
.map(genOutputCol(_, source))
.map(df.withColumn(source, _))
.getOrElse(df)
}
}
Run Code Online (Sandbox Code Playgroud)
用法示例:
scala> case class features(feat1: String, feat2: String, feat3: String)
defined class features
scala> case class record(label: String, features: features)
defined class record
scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]
scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show
+-------+--------+
| label|features|
+-------+--------+
|a_label| [f2,f3]|
+-------+--------+
scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
Run Code Online (Sandbox Code Playgroud)
添加一个隐式转换,你很高兴.
Mic*_*tor 15
此版本允许您删除任何级别的嵌套列:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, DataType}
/**
* Various Spark utilities and extensions of DataFrame
*/
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else {
colType match {
case colType: StructType =>
if (dropColName.startsWith(s"${fullColName}.")) {
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
} else {
Some(col)
}
case other => Some(col)
}
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
Run Code Online (Sandbox Code Playgroud)
用法:
import DataFrameUtils._
df.dropNestedColumn("a.b.c.d")
Run Code Online (Sandbox Code Playgroud)
dropFields
对于 Spark 3.1+,您可以在结构类型列上使用方法:
按名称删除 StructType 中的字段的表达式。如果架构不包含字段名称,则这是无操作。
val df = sql("SELECT named_struct('feat1', 1, 'feat2', 2, 'feat3', 3) features")
val df1 = df.withColumn("features", $"features".dropFields("feat1"))
Run Code Online (Sandbox Code Playgroud)
扩展 spektom 答案。支持数组类型:
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
Some(struct(innerType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
}
case other => Some(col)
}
} else {
Some(col)
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
Run Code Online (Sandbox Code Playgroud)
我将在这里扩展 mmendez.semantic 的答案,并解释子线程中描述的问题。
def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
// we are potentially dropping a column from within a struct, that is itself inside an array
// Spark has some very strange behavior in this case, which they insist is not a bug
// see https://issues.apache.org/jira/browse/SPARK-31779 and associated comments
// and also the thread here: /sf/answers/2796066871/
// this is a workaround for that behavior
// first, get all struct fields
val innerFields = innerType.fields
// next, create a new type for all the struct fields EXCEPT the column that is to be dropped
// we will need this later
val preserveNamesStruct = ArrayType(StructType(
innerFields.filterNot(f => s"$fullColName.${f.name}".equals(dropColName))
))
// next, apply dropSubColumn recursively to build up the new values after dropping the column
val filteredInnerFields = innerFields.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
}
)
// finally, use arrays_zip to unwrap the arrays that were introduced by building up the new. filtered
// struct in this way (see comments in SPARK-31779), and then cast to the StructType we created earlier
// to get the original names back
Some(arrays_zip(filteredInnerFields:_*).cast(preserveNamesStruct))
}
case _ => Some(col)
}
} else {
Some(col)
}
}
def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
}).foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
Run Code Online (Sandbox Code Playgroud)
用途spark-shell
:
// if defining the functions above in your spark-shell session, you first need imports
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// now you can paste the function definitions
// create a deeply nested and complex JSON structure
val jsonData = """{
"foo": "bar",
"top": {
"child1": 5,
"child2": [
{
"child2First": "one",
"child2Second": 2,
"child2Third": -19.51
}
],
"child3": ["foo", "bar", "baz"],
"child4": [
{
"child2First": "two",
"child2Second": 3,
"child2Third": 16.78
}
]
}
}"""
// read it into a DataFrame
val df = spark.read.option("multiline", "true").json(Seq(jsonData).toDS())
// remove a sub-column
val modifiedDf = dropColumn(df, "top.child2.child2First")
modifiedDf.printSchema
root
|-- foo: string (nullable = true)
|-- top: struct (nullable = false)
| |-- child1: long (nullable = true)
| |-- child2: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
| |-- child3: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- child4: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2First: string (nullable = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
modifiedDf.show(truncate=false)
+---+------------------------------------------------------+
|foo|top |
+---+------------------------------------------------------+
|bar|[5, [[2, -19.51]], [foo, bar, baz], [[two, 3, 16.78]]]|
+---+------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
10792 次 |
最近记录: |