Mat*_*rer 30 scala apache-spark
我在Spark中使用scala有一个数据框,它有一个我需要拆分的列.
scala> test.show
+-------------+
|columnToSplit|
+-------------+
| a.b.c|
| d.e.f|
+-------------+
Run Code Online (Sandbox Code Playgroud)
我需要将此列拆分为如下所示:
+--------------+
|col1|col2|col3|
| a| b| c|
| d| e| f|
+--------------+
Run Code Online (Sandbox Code Playgroud)
我正在使用Spark 2.0.0
谢谢
小智 68
尝试:
df.withColumn("_tmp", split($"columnToSplit", "\\.")).select(
$"_tmp".getItem(0).as("col1"),
$"_tmp".getItem(1).as("col2"),
$"_tmp".getItem(2).as("col3")
).drop("_tmp")
Run Code Online (Sandbox Code Playgroud)
Psi*_*dom 30
要以编程方式做到这一点,你可以创建一个表达式序列(0 until 3).map(i => col("temp").getItem(i).as(s"col$i"))(假设你需要3列,结果),然后将其应用到select与: _*语法:
df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
(0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+----+----+----+
|col0|col1|col2|
+----+----+----+
| a| b| c|
| d| e| f|
+----+----+----+
Run Code Online (Sandbox Code Playgroud)
保留所有列:
df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
col("*") +: (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+-------------+---------+----+----+----+
|columnToSplit| temp|col0|col1|col2|
+-------------+---------+----+----+----+
| a.b.c|[a, b, c]| a| b| c|
| d.e.f|[d, e, f]| d| e| f|
+-------------+---------+----+----+----+
Run Code Online (Sandbox Code Playgroud)
如果您正在使用pyspark,请使用列表推导来替换mapin scala:
df = spark.createDataFrame([['a.b.c'], ['d.e.f']], ['columnToSplit'])
from pyspark.sql.functions import col, split
(df.withColumn('temp', split('columnToSplit', '\\.'))
.select(*(col('temp').getItem(i).alias(f'col{i}') for i in range(3))
).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
| a| b| c|
| d| e| f|
+----+----+----+
Run Code Online (Sandbox Code Playgroud)
Sas*_*ter 17
避免选择部分的解决方案.当您只想追加新列时,这很有用:
case class Message(others: String, text: String)
val r1 = Message("foo1", "a.b.c")
val r2 = Message("foo2", "d.e.f")
val records = Seq(r1, r2)
val df = spark.createDataFrame(records)
df.withColumn("col1", split(col("text"), "\\.").getItem(0))
.withColumn("col2", split(col("text"), "\\.").getItem(1))
.withColumn("col3", split(col("text"), "\\.").getItem(2))
.show(false)
+------+-----+----+----+----+
|others|text |col1|col2|col3|
+------+-----+----+----+----+
|foo1 |a.b.c|a |b |c |
|foo2 |d.e.f|d |e |f |
+------+-----+----+----+----+
Run Code Online (Sandbox Code Playgroud)
更新:我强烈建议使用Psidom的实现来避免分裂三次.
小智 5
这会将列附加到原始DataFrame并且不使用select,并且仅使用临时列拆分一次:
import spark.implicits._
df.withColumn("_tmp", split($"columnToSplit", "\\."))
.withColumn("col1", $"_tmp".getItem(0))
.withColumn("col2", $"_tmp".getItem(1))
.withColumn("col3", $"_tmp".getItem(2))
.drop("_tmp")
Run Code Online (Sandbox Code Playgroud)
这扩展了 Psidom 的答案,并展示了如何动态进行分割,而不需要对列数进行硬编码。此答案运行查询来计算列数。
val df = Seq(
"a.b.c",
"d.e.f"
).toDF("my_str")
.withColumn("letters", split(col("my_str"), "\\."))
val numCols = df
.withColumn("letters_size", size($"letters"))
.agg(max($"letters_size"))
.head()
.getInt(0)
df
.select(
(0 until numCols).map(i => $"letters".getItem(i).as(s"col$i")): _*
)
.show()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
49587 次 |
| 最近记录: |