ech*_*hen 37 scala apache-spark apache-spark-sql
所有,
是否有一种优雅且可接受的方式来使用嵌套的列展平Spark SQL表(Parquet) StructType
例如
如果我的架构是:
foo
|_bar
|_baz
x
y
z
Run Code Online (Sandbox Code Playgroud)
如何在不依靠手动运行的情况下将其选择为展平的表格形式
df.select("foo.bar","foo.baz","x","y","z")
Run Code Online (Sandbox Code Playgroud)
换句话说,如何在a StructType和a下以编程方式获得上述代码的结果DataFrame
Dav*_*fin 65
简短的回答是,没有"接受"的方法来做到这一点,但你可以非常优雅地使用递归函数,select(...)通过遍历整个函数来生成语句DataFrame.schema.
递归函数应该返回一个Array[Column].每次函数命中时StructType,它都会调用自身并将返回的函数附加Array[Column]到它自己的函数中Array[Column].
就像是:
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName))
}
})
}
Run Code Online (Sandbox Code Playgroud)
然后你会像这样使用它:
df.select(flattenSchema(df.schema):_*)
Run Code Online (Sandbox Code Playgroud)
V. *_*mma 19
我正在改进我以前的答案,并在接受的答案的评论中提出我自己的问题的解决方案.
此接受的解决方案创建一个Column对象数组,并使用它来选择这些列.在星火,如果你有一个嵌套的数据帧,可以选择这样的子列:df.select("Parent.Child")这会返回一个数据框与孩子列的值,并命名为儿童.但是,如果你有不同的父结构的属性相同的名称,你失去对家长的信息,并最终可能具有相同的列名,不能由名字再访问他们,因为他们是毫不含糊的.
这是我的问题.
我找到了解决问题的方法,也许它可以帮助别人.我flattenSchema单独打电话:
val flattenedSchema = flattenSchema(df.schema)
Run Code Online (Sandbox Code Playgroud)
并返回一个Column对象数组.我select()会将原始列名称作为字符串映射到自己,而不是在最后一个级别的子项中返回一个DataFrame Parent.Child,而是在选择列之后,将其重命名为(我也替换了点)为了我的方便,使用下划线):Parent.ChildChild
val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))
Run Code Online (Sandbox Code Playgroud)
然后你可以使用原始答案中显示的select函数:
var newDf = df.select(renamedCols:_*)
Run Code Online (Sandbox Code Playgroud)
Eva*_*n V 12
只是想分享我对Pyspark的解决方案 - 它或多或少是@David Griffin解决方案的翻译,所以它支持任何级别的嵌套对象.
from pyspark.sql.types import StructType, ArrayType
def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
name = prefix + '.' + field.name if prefix else field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(name)
return fields
df.select(flatten(df.schema)).show()
Run Code Online (Sandbox Code Playgroud)
我DataFrame#flattenSchema向开源spark-daria项目添加了一个方法。
下面介绍了如何在代码中使用该函数。
import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.flattenSchema().show()
+-------+-------+---------+----+---+
|foo.bar|foo.baz| x| y| z|
+-------+-------+---------+----+---+
| this| is|something|cool| ;)|
+-------+-------+---------+----+---+
Run Code Online (Sandbox Code Playgroud)
您还可以使用该方法指定不同的列名分隔符flattenSchema()。
df.flattenSchema(delimiter = "_").show()
+-------+-------+---------+----+---+
|foo_bar|foo_baz| x| y| z|
+-------+-------+---------+----+---+
| this| is|something|cool| ;)|
+-------+-------+---------+----+---+
Run Code Online (Sandbox Code Playgroud)
这个分隔符参数非常重要。如果您要展平架构以在 Redshift 中加载表,则将无法使用句点作为分隔符。
这是生成此输出的完整代码片段。
val data = Seq(
Row(Row("this", "is"), "something", "cool", ";)")
)
val schema = StructType(
Seq(
StructField(
"foo",
StructType(
Seq(
StructField("bar", StringType, true),
StructField("baz", StringType, true)
)
),
true
),
StructField("x", StringType, true),
StructField("y", StringType, true),
StructField("z", StringType, true)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(schema)
)
df.flattenSchema().show()
Run Code Online (Sandbox Code Playgroud)
底层代码类似于 David Griffin 的代码(如果您不想将 Spark-daria 依赖项添加到您的项目中)。
object StructTypeHelpers {
def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
schema.fields.flatMap(structField => {
val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name
structField.dataType match {
case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
case _ => Array(col(codeColName).alias(colName))
}
})
}
}
object DataFrameExt {
implicit class DataFrameMethods(df: DataFrame) {
def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
df.select(
StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
)
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
30082 次 |
| 最近记录: |