在spark scala中将1列拆分为3列

Mat*_*rer 30 scala apache-spark

我在Spark中使用scala有一个数据框,它有一个我需要拆分的列.

scala> test.show
+-------------+
|columnToSplit|
+-------------+
|        a.b.c|
|        d.e.f|
+-------------+
Run Code Online (Sandbox Code Playgroud)

我需要将此列拆分为如下所示:

+--------------+
|col1|col2|col3|
|   a|   b|   c|
|   d|   e|   f|
+--------------+
Run Code Online (Sandbox Code Playgroud)

我正在使用Spark 2.0.0

谢谢

小智 68

尝试:

df.withColumn("_tmp", split($"columnToSplit", "\\.")).select(
  $"_tmp".getItem(0).as("col1"),
  $"_tmp".getItem(1).as("col2"),
  $"_tmp".getItem(2).as("col3")
).drop("_tmp")
Run Code Online (Sandbox Code Playgroud)

  • @Jake import org.apache.spark.sql.functions.split使用这个 (8认同)

Psi*_*dom 30

要以编程方式做到这一点,你可以创建一个表达式序列(0 until 3).map(i => col("temp").getItem(i).as(s"col$i"))(假设你需要3列,结果),然后将其应用到select: _*语法:

df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
    (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
+----+----+----+
Run Code Online (Sandbox Code Playgroud)

保留所有列:

df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
    col("*") +: (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+-------------+---------+----+----+----+
|columnToSplit|     temp|col0|col1|col2|
+-------------+---------+----+----+----+
|        a.b.c|[a, b, c]|   a|   b|   c|
|        d.e.f|[d, e, f]|   d|   e|   f|
+-------------+---------+----+----+----+
Run Code Online (Sandbox Code Playgroud)

如果您正在使用pyspark,请使用列表推导来替换mapin scala:

df = spark.createDataFrame([['a.b.c'], ['d.e.f']], ['columnToSplit'])
from pyspark.sql.functions import col, split

(df.withColumn('temp', split('columnToSplit', '\\.'))
   .select(*(col('temp').getItem(i).alias(f'col{i}') for i in range(3))
).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
+----+----+----+
Run Code Online (Sandbox Code Playgroud)

  • 我们可以拆分为任意数量的列吗?为什么 3 必须被硬编码? (2认同)

Sas*_*ter 17

避免选择部分的解决方案.当您只想追加新列时,这很有用:

case class Message(others: String, text: String)

val r1 = Message("foo1", "a.b.c")
val r2 = Message("foo2", "d.e.f")

val records = Seq(r1, r2)
val df = spark.createDataFrame(records)

df.withColumn("col1", split(col("text"), "\\.").getItem(0))
  .withColumn("col2", split(col("text"), "\\.").getItem(1))
  .withColumn("col3", split(col("text"), "\\.").getItem(2))
  .show(false)

+------+-----+----+----+----+
|others|text |col1|col2|col3|
+------+-----+----+----+----+
|foo1  |a.b.c|a   |b   |c   |
|foo2  |d.e.f|d   |e   |f   |
+------+-----+----+----+----+
Run Code Online (Sandbox Code Playgroud)

更新:我强烈建议使用Psidom的实现来避免分裂三次.


小智 5

这会将列附加到原始DataFrame并且不使用select,并且仅使用临时列拆分一次:

import spark.implicits._

df.withColumn("_tmp", split($"columnToSplit", "\\."))
  .withColumn("col1", $"_tmp".getItem(0))
  .withColumn("col2", $"_tmp".getItem(1))
  .withColumn("col3", $"_tmp".getItem(2))
  .drop("_tmp")
Run Code Online (Sandbox Code Playgroud)


Pow*_*ers 5

这扩展了 Psidom 的答案,并展示了如何动态进行分割,而不需要对列数进行硬编码。此答案运行查询来计算列数。

val df = Seq(
  "a.b.c",
  "d.e.f"
).toDF("my_str")
.withColumn("letters", split(col("my_str"), "\\."))

val numCols = df
  .withColumn("letters_size", size($"letters"))
  .agg(max($"letters_size"))
  .head()
  .getInt(0)

df
  .select(
    (0 until numCols).map(i => $"letters".getItem(i).as(s"col$i")): _*
  )
  .show()
Run Code Online (Sandbox Code Playgroud)