如何通过聚合连接两个表

Mir*_*ira 5 sql arrays apache-spark apache-spark-sql pyspark

我有两个 pyspark 数据框:

一是:

name start end
bob    1   3
john   5   8
Run Code Online (Sandbox Code Playgroud)

第二个是:

day outcome
  1   a
  2   c
  3   d
  4   a
  5   e
  6   c
  7   u
  8   l
Run Code Online (Sandbox Code Playgroud)

我需要每个人的连续日结果,例如

bob  acd

john  ecul
Run Code Online (Sandbox Code Playgroud)

在 pyspark 中可以这样做吗?

Shu*_*rma 6

代码

df = df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day))

df = df.groupBy('name').agg(F.collect_list(F.struct('day', 'outcome')).alias('outcome'))
df = df.withColumn('outcome', F.transform(F.array_sort('outcome'), lambda x: x.outcome))
df = df.withColumn('outcome', F.array_join('outcome', ''))
Run Code Online (Sandbox Code Playgroud)

这是如何运作的?

使用非等值连接方法来连接数据df1df2

+----+-----+---+---+-------+
|name|start|end|day|outcome|
+----+-----+---+---+-------+
|bob |1    |3  |1  |a      |
|bob |1    |3  |2  |c      |
|bob |1    |3  |3  |d      |
|john|5    |8  |5  |e      |
|john|5    |8  |6  |c      |
|john|5    |8  |7  |u      |
|john|5    |8  |8  |l      |
+----+-----+---+---+-------+
Run Code Online (Sandbox Code Playgroud)

namecollect日期和结果对对数据框进行分组。这一步至关重要,因为day需要列来维持串联的顺序

+----+--------------------------------+
|name|outcome                         |
+----+--------------------------------+
|john|[{5, e}, {6, c}, {7, u}, {8, l}]|
|bob |[{1, a}, {2, c}, {3, d}]        |
+----+--------------------------------+
Run Code Online (Sandbox Code Playgroud)

对数组进行排序并进行转换以提取outcome

+----+------------+
|name|outcome     |
+----+------------+
|john|[e, c, u, l]|
|bob |[a, c, d]   |
+----+------------+
Run Code Online (Sandbox Code Playgroud)

连接数组即可得到结果

+----+-------+
|name|outcome|
+----+-------+
|john|ecul   |
|bob |acd    |
+----+-------+
Run Code Online (Sandbox Code Playgroud)


lef*_*oin 3

使用 Spark-SQL。我用的是scala,但是pyspark中的SQL是完全相同的,我相信如果pyspark有任何差异,你可以轻松转换。

连接两个数据帧,使用collect_list()获取结果数组,然后使用concat_ws()将数组连接到字符串:

val dF1 = Seq(
("bob", 1, 3),
("john",  5, 8)
).toDF("name","start","end")

dF1.createOrReplaceTempView("dF1")

val dF2 = Seq(
(1, "a"),
(2, "c"),
(3, "d"),
(4, "a"),
(5, "e"),
(6, "c"),
(7, "u"),
(8, "l")
).toDF("day","outcome")

dF2.createOrReplaceTempView("dF2")


spark.sql(""" 
select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
from
(select d1.name, e.day 
  from dF1 d1 
       lateral view explode(sequence(d1.start, d1.end)) e as day
)d1
left join dF2 d2 on d1.day=d2.day
group by d1.name
""").show(100, false)
Run Code Online (Sandbox Code Playgroud)

结果:

+----+-------+
|name|outcome|
+----+-------+
|bob |acd    |
|john|ecul   |
+----+-------+
Run Code Online (Sandbox Code Playgroud)

修复 OOM:

spark.sql(""" 
select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
from dF1 d1 
left join dF2 d2 on d1.start<=d2.day and  d1.end>=d2.day
group by d1.name
""").show(100, false)
Run Code Online (Sandbox Code Playgroud)