Mir*_*ira 5 sql arrays apache-spark apache-spark-sql pyspark
我有两个 pyspark 数据框:
一是:
name start end
bob 1 3
john 5 8
Run Code Online (Sandbox Code Playgroud)
第二个是:
day outcome
1 a
2 c
3 d
4 a
5 e
6 c
7 u
8 l
Run Code Online (Sandbox Code Playgroud)
我需要每个人的连续日结果,例如
bob acd
john ecul
Run Code Online (Sandbox Code Playgroud)
在 pyspark 中可以这样做吗?
df = df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day))
df = df.groupBy('name').agg(F.collect_list(F.struct('day', 'outcome')).alias('outcome'))
df = df.withColumn('outcome', F.transform(F.array_sort('outcome'), lambda x: x.outcome))
df = df.withColumn('outcome', F.array_join('outcome', ''))
Run Code Online (Sandbox Code Playgroud)
使用非等值连接方法来连接数据df1框df2
+----+-----+---+---+-------+
|name|start|end|day|outcome|
+----+-----+---+---+-------+
|bob |1 |3 |1 |a |
|bob |1 |3 |2 |c |
|bob |1 |3 |3 |d |
|john|5 |8 |5 |e |
|john|5 |8 |6 |c |
|john|5 |8 |7 |u |
|john|5 |8 |8 |l |
+----+-----+---+---+-------+
Run Code Online (Sandbox Code Playgroud)
name按collect日期和结果对对数据框进行分组。这一步至关重要,因为day需要列来维持串联的顺序
+----+--------------------------------+
|name|outcome |
+----+--------------------------------+
|john|[{5, e}, {6, c}, {7, u}, {8, l}]|
|bob |[{1, a}, {2, c}, {3, d}] |
+----+--------------------------------+
Run Code Online (Sandbox Code Playgroud)
对数组进行排序并进行转换以提取outcome值
+----+------------+
|name|outcome |
+----+------------+
|john|[e, c, u, l]|
|bob |[a, c, d] |
+----+------------+
Run Code Online (Sandbox Code Playgroud)
连接数组即可得到结果
+----+-------+
|name|outcome|
+----+-------+
|john|ecul |
|bob |acd |
+----+-------+
Run Code Online (Sandbox Code Playgroud)
使用 Spark-SQL。我用的是scala,但是pyspark中的SQL是完全相同的,我相信如果pyspark有任何差异,你可以轻松转换。
连接两个数据帧,使用collect_list()获取结果数组,然后使用concat_ws()将数组连接到字符串:
val dF1 = Seq(
("bob", 1, 3),
("john", 5, 8)
).toDF("name","start","end")
dF1.createOrReplaceTempView("dF1")
val dF2 = Seq(
(1, "a"),
(2, "c"),
(3, "d"),
(4, "a"),
(5, "e"),
(6, "c"),
(7, "u"),
(8, "l")
).toDF("day","outcome")
dF2.createOrReplaceTempView("dF2")
spark.sql("""
select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
from
(select d1.name, e.day
from dF1 d1
lateral view explode(sequence(d1.start, d1.end)) e as day
)d1
left join dF2 d2 on d1.day=d2.day
group by d1.name
""").show(100, false)
Run Code Online (Sandbox Code Playgroud)
结果:
+----+-------+
|name|outcome|
+----+-------+
|bob |acd |
|john|ecul |
+----+-------+
Run Code Online (Sandbox Code Playgroud)
修复 OOM:
spark.sql("""
select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
from dF1 d1
left join dF2 d2 on d1.start<=d2.day and d1.end>=d2.day
group by d1.name
""").show(100, false)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
105 次 |
| 最近记录: |