将Spark Dataframe字符串列拆分为多个列

Pet*_*ney 47 apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql

我见过各种各样的人都认为这Dataframe.explode是一种有用的方法,但它会导致比原始数据帧更多的行,这根本不是我想要的.我只是想做Dataframe相当于非常简单:

rdd.map(lambda row: row + [row.my_str_col.split('-')])
Run Code Online (Sandbox Code Playgroud)

它看起来像:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg
Run Code Online (Sandbox Code Playgroud)

并将其转换为:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg
Run Code Online (Sandbox Code Playgroud)

我知道pyspark.sql.functions.split(),但它导致嵌套数组列而不是我想要的两个顶级列.

理想情况下,我希望这些新列也可以命名.

Pet*_*ney 78

pyspark.sql.functions.split()这是正确的方法 - 您只需将嵌套的ArrayType列展平为多个顶级列.在这种情况下,每个数组只包含2个项目,这很容易.您只需使用Column.getItem()以列本身的形式检索数组的每个部分:

split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))
Run Code Online (Sandbox Code Playgroud)

结果将是:

col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg
Run Code Online (Sandbox Code Playgroud)

我不确定如何在一般情况下解决这个问题,其中嵌套数组的行与行不同.

  • 最终使用了一个python循环,即 - 对于范围内的i(max(len_of_split):df = df.withcolumn(split.getItem(i)) (3认同)
  • 您是否找到了解决不平衡情况的解决方案? (2认同)
  • 有没有办法将剩余的项目放在一列中?即第三列中的`split_col.getItem(2 - n)`。我想像上面的循环那样为所有项目创建列然后连接它们可能会起作用,但我不知道这是否非常有效。 (2认同)

pau*_*ult 26

这是一般情况的解决方案,不涉及需要提前知道数组的长度,使用collect或使用udfs.不幸的是,这仅适用于spark2.1及更高版本,因为它需要该posexplode功能.

假设您有以下DataFrame:

df = spark.createDataFrame(
    [
        [1, 'A, B, C, D'], 
        [2, 'E, F, G'], 
        [3, 'H, I'], 
        [4, 'J']
    ]
    , ["num", "letters"]
)
df.show()
#+---+----------+
#|num|   letters|
#+---+----------+
#|  1|A, B, C, D|
#|  2|   E, F, G|
#|  3|      H, I|
#|  4|         J|
#+---+----------+
Run Code Online (Sandbox Code Playgroud)

拆分letters列,然后使用posexplode爆炸结果数组以及数组中的位置.接下来用于在此数组pyspark.sql.functions.expr中的index处获取元素pos.

import pyspark.sql.functions as f

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .show()
#+---+------------+---+---+
#|num|     letters|pos|val|
#+---+------------+---+---+
#|  1|[A, B, C, D]|  0|  A|
#|  1|[A, B, C, D]|  1|  B|
#|  1|[A, B, C, D]|  2|  C|
#|  1|[A, B, C, D]|  3|  D|
#|  2|   [E, F, G]|  0|  E|
#|  2|   [E, F, G]|  1|  F|
#|  2|   [E, F, G]|  2|  G|
#|  3|      [H, I]|  0|  H|
#|  3|      [H, I]|  1|  I|
#|  4|         [J]|  0|  J|
#+---+------------+---+---+
Run Code Online (Sandbox Code Playgroud)

现在我们从这个结果中创建两个新列.第一个是我们新列的名称,它将是letter数组中的索引和索引的串联.第二列将是数组中相应索引处的值.我们通过利用其功能pyspark.sql.functions.expr允许我们使用列值作为参数来获得后者.

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .show()
#+---+-------+---+
#|num|   name|val|
#+---+-------+---+
#|  1|letter0|  A|
#|  1|letter1|  B|
#|  1|letter2|  C|
#|  1|letter3|  D|
#|  2|letter0|  E|
#|  2|letter1|  F|
#|  2|letter2|  G|
#|  3|letter0|  H|
#|  3|letter1|  I|
#|  4|letter0|  J|
#+---+-------+---+
Run Code Online (Sandbox Code Playgroud)

现在我们可以只groupBy使用numpivotDataFrame.把它们放在一起,我们得到:

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .groupBy("num").pivot("name").agg(f.first("val"))\
    .show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#|  1|      A|      B|      C|      D|
#|  3|      H|      I|   null|   null|
#|  2|      E|      F|      G|   null|
#|  4|      J|   null|   null|   null|
#+---+-------+-------+-------+-------+
Run Code Online (Sandbox Code Playgroud)

  • 仅供参考,我尝试使用 3909 个元素拆分约 170 万原始行,但速度太慢/一小时后未完成 (2认同)

小智 15

这是另一种方法,以防您想用分隔符拆分字符串。

import pyspark.sql.functions as f

df = spark.createDataFrame([("1:a:2001",),("2:b:2002",),("3:c:2003",)],["value"])
df.show()
+--------+
|   value|
+--------+
|1:a:2001|
|2:b:2002|
|3:c:2003|
+--------+

df_split = df.select(f.split(df.value,":")).rdd.flatMap(
              lambda x: x).toDF(schema=["col1","col2","col3"])

df_split.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|2001|
|   2|   b|2002|
|   3|   c|2003|
+----+----+----+
Run Code Online (Sandbox Code Playgroud)

我不认为这种来回向 RDD 的转换会减慢你的速度......也不要担心最后的模式规范:它是可选的,你可以避免将解决方案推广到未知列大小的数据。