在 Spark SQL (pyspark) 中将行转置为列

Gow*_*n V 1 sql apache-spark-sql pyspark

我想在Spark中进行以下转换我的目标是获得输出,我希望如果我可以进行中间转换,我可以轻松获得输出。关于如何将行转换为列的任何想法都会很有帮助。

RowID  Name  Place
1      Gaga India,US,UK
1      Katy UK,India,Europe
1      Bey  Europe
2      Gaga Null
2      Katy India,Europe
2      Bey  US
3      Gaga Europe
3      Katy US
3      Bey  Null

Output:

RowID   Id  Gaga    Katy    Bey
1       1   India   UK      Europe
1       2   US      India   Null
1       3   UK      Europe  Null
2       1   Null    India   US
2       2   Null    Europe  Null
3       1   Europe  US      Null


Intermediate Output:

RowID   Gaga         Katy               Bey
1       India,US,UK  UK,India,Europe    Europe
2       Null         India,Europe       US
3       Europe       US                 Null
Run Code Online (Sandbox Code Playgroud)

Sur*_*esh 5

使用 Dataframe 函数和 UDF,我已经尝试过了。希望对你有帮助。

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import IntegerType
>>> from functools import reduce
>>> from pyspark.sql import DataFrame
>>> from pyspark.sql import Window
>>> l = [(1,'Gaga','India,US,UK'),(1,'Katy','UK,India,Europe'),(1,'Bey','Europe'),(2,'Gaga',None),(2,'Katy','India,Europe'),(2,'Bey','US'),(3,'Gaga','Europe'),
... (3,'Katy','US'),(3,'Bey',None)]
>>> df = spark.createDataFrame(l,['RowID','Name','Place'])
>>> df = df.withColumn('Placelist',F.split(df.Place,','))
>>> df.show()
+-----+----+---------------+-------------------+
|RowID|Name|          Place|          Placelist|
+-----+----+---------------+-------------------+
|    1|Gaga|    India,US,UK|    [India, US, UK]|
|    1|Katy|UK,India,Europe|[UK, India, Europe]|
|    1| Bey|         Europe|           [Europe]|
|    2|Gaga|           null|               null|
|    2|Katy|   India,Europe|    [India, Europe]|
|    2| Bey|             US|               [US]|
|    3|Gaga|         Europe|           [Europe]|
|    3|Katy|             US|               [US]|
|    3| Bey|           null|               null|
+-----+----+---------------+-------------------+

>>> udf1 = F.udf(lambda x : len(x) if x is not None else 0,IntegerType())
>>> maxlen = df.agg(F.max(udf1('Placelist'))).first()[0]
>>> df1 = df.groupby('RowID').pivot('Name').agg(F.first('Placelist'))
>>> df1.show()
+-----+--------+---------------+-------------------+
|RowID|     Bey|           Gaga|               Katy|
+-----+--------+---------------+-------------------+
|    1|[Europe]|[India, US, UK]|[UK, India, Europe]|
|    3|    null|       [Europe]|               [US]|
|    2|    [US]|           null|    [India, Europe]|
+-----+--------+---------------+-------------------+

>>> finaldf = reduce(
...     DataFrame.unionAll,
...     (df1.select("RowID", F.col("Bey").getItem(i), F.col("Gaga").getItem(i),F.col("Katy").getItem(i) )
...         for i in range(maxlen))
... ).toDF(*df1.columns).na.drop('all',subset=df1.columns[1:]).orderBy('RowID')
>>> w = Window.partitionBy('RowID').orderBy('Bey')
>>> finaldf = finaldf.withColumn('ID',F.row_number().over(w))
>>> finaldf.select('RowID','ID','Gaga','Katy','Bey').show()
+-----+---+------+------+------+
|RowID| ID|  Gaga|  Katy|   Bey|
+-----+---+------+------+------+
|    1|  1|    US| India|  null|
|    1|  2|    UK|Europe|  null|
|    1|  3| India|    UK|Europe|
|    2|  1|  null|Europe|  null|
|    2|  2|  null| India|    US|
|    3|  1|Europe|    US|  null|
+-----+---+------+------+------+
Run Code Online (Sandbox Code Playgroud)