Col*_*ham 11 python pandas pyspark
表A有许多列带有日期列,表B有一个日期时间和一个值。两个表中的数据都是零星生成的,没有固定的时间间隔。桌子A很小,桌子B很大。
我需要加入B到A的条件下,给定元素a的A.datetime对应
B[B['datetime'] <= a]]['datetime'].max()
Run Code Online (Sandbox Code Playgroud)
有几种方法可以做到这一点,但我想要最有效的方法。
将小数据集广播为 Pandas DataFrame。设置一个 Spark UDF,使用merge_asof.
使用 Spark SQL 的广播连接功能:在以下条件下设置 theta 连接
B['datetime'] <= A['datetime']
Run Code Online (Sandbox Code Playgroud)
然后消除所有多余的行。
选项 B 看起来很糟糕……但是请告诉我第一种方法是否有效或者是否有另一种方法。
编辑:这是示例输入和预期输出:
A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
| A |2019-02-03|
| B |2019-03-14|
+---------+----------+
B =
+---------+----------+
| Key | Datetime |
+---------+----------+
| 0 |2019-01-01|
| 1 |2019-01-15|
| 2 |2019-02-01|
| 3 |2019-02-15|
| 4 |2019-03-01|
| 5 |2019-03-15|
+---------+----------+
custom_join(A,B) =
+---------+----------+
| Column1 | Key |
+---------+----------+
| A | 2 |
| B | 4 |
+---------+----------+
Run Code Online (Sandbox Code Playgroud)
#### For Example:
from pyspark.sql import SparkSession, Row, DataFrame
import pandas as pd
spark = SparkSession.builder.master("local").getOrCreate()
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string"
).show()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
+--------+---+---+---+
| time| id| v1| v2|
+--------+---+---+---+
|20000101| 1|1.0| x|
|20000102| 1|3.0| x|
|20000101| 2|2.0| y|
|20000102| 2|4.0| y|
+--------+---+---+---+
Run Code Online (Sandbox Code Playgroud)