Joh*_*ohn 7 apache-spark pyspark spark-dataframe
有没有办法压缩任意嵌套的Spark Dataframe?我所看到的大多数工作都是针对特定的模式编写的,我希望能够通过不同的嵌套类型(例如StructType,ArrayType,MapType等)来泛化一个Dataframe.
假设我有一个类似的架构:
StructType(List(StructField(field1,...), StructField(field2,...), ArrayType(StructType(List(StructField(nested_field1,...), StructField(nested_field2,...)),nested_array,...)))
Run Code Online (Sandbox Code Playgroud)
希望将其调整为具有如下结构的平台:
field1
field2
nested_array.nested_field1
nested_array.nested_field2
Run Code Online (Sandbox Code Playgroud)
仅供参考,寻找Pyspark的建议,但其他风味的Spark也值得赞赏.
MaF*_*aFF 14
这个问题可能有点旧,但对于那些仍在寻找解决方案的人来说,你可以使用select*来内联复杂的数据类型:
首先让我们创建嵌套的数据帧:
from pyspark.sql import HiveContext
hc = HiveContext(sc)
nested_df = hc.read.json(sc.parallelize(["""
{
"field1": 1,
"field2": 2,
"nested_array":{
"nested_field1": 3,
"nested_field2": 4
}
}
"""]))
Run Code Online (Sandbox Code Playgroud)
现在要压扁它:
flat_df = nested_df.select("field1", "field2", "nested_array.*")
Run Code Online (Sandbox Code Playgroud)
你会在这里找到有用的例子https://docs.databricks.com/spark/latest/spark-sql/complex-types.html
如果嵌套数组太多,可以使用:
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(*flat_cols, *[c + ".*" for c in nested_cols])
Run Code Online (Sandbox Code Playgroud)
这会展平具有结构类型和 数组类型的嵌套 df。通过 Json 读取数据时通常会有所帮助。对此进行了改进/sf/answers/3957342161/
from pyspark.sql.types import *
from pyspark.sql import functions as f
def flatten_structs(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
array_cols = [
c[0]
for c in df.dtypes
if c[1][:5] == "array"
]
flat_cols = [
f.col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
def flatten_array_struct_df(df):
array_cols = [
c[0]
for c in df.dtypes
if c[1][:5] == "array"
]
while len(array_cols) > 0:
for array_col in array_cols:
cols_to_select = [x for x in df.columns if x != array_col ]
df = df.withColumn(array_col, f.explode(f.col(array_col)))
df = flatten_structs(df)
array_cols = [
c[0]
for c in df.dtypes
if c[1][:5] == "array"
]
return df
flat_df = flatten_array_struct_df(df)
Run Code Online (Sandbox Code Playgroud)
**
这是我的最终方法:
1)将数据帧中的行映射到字典的rdd。在线查找合适的 python 代码来扁平化 dict。
flat_rdd = nested_df.map(lambda x : flatten(x))
Run Code Online (Sandbox Code Playgroud)
在哪里
def flatten(x):
x_dict = x.asDict()
...some flattening code...
return x_dict
Run Code Online (Sandbox Code Playgroud)
2)将RDD[dict]转换回数据帧
flat_df = sqlContext.createDataFrame(flat_rdd)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
13710 次 |
| 最近记录: |