如何在 PySpark 中创建 merge_asof 功能?

Col*_*ham 11 python pandas pyspark

A有许多列带有日期列,表B有一个日期时间和一个值。两个表中的数据都是零星生成的,没有固定的时间间隔。桌子A很小,桌子B很大。

我需要加入BA的条件下,给定元素aA.datetime对应

B[B['datetime'] <= a]]['datetime'].max()
Run Code Online (Sandbox Code Playgroud)

有几种方法可以做到这一点,但我想要最有效的方法。

选项1

将小数据集广播为 Pandas DataFrame。设置一个 Spark UDF,使用merge_asof.

选项 2

使用 Spark SQL 的广播连接功能:在以下条件下设置 theta 连接

B['datetime'] <= A['datetime']
Run Code Online (Sandbox Code Playgroud)

然后消除所有多余的行。

选项 B 看起来很糟糕……但是请告诉我第一种方法是否有效或者是否有另一种方法。

编辑:这是示例输入和预期输出:

A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
|    A    |2019-02-03|
|    B    |2019-03-14|
+---------+----------+

B =
+---------+----------+
|   Key   | Datetime |
+---------+----------+
|    0    |2019-01-01|
|    1    |2019-01-15|
|    2    |2019-02-01|
|    3    |2019-02-15|
|    4    |2019-03-01|
|    5    |2019-03-15|
+---------+----------+

custom_join(A,B) =
+---------+----------+
| Column1 |   Key    |
+---------+----------+
|    A    |     2    |
|    B    |     4    |
+---------+----------+
Run Code Online (Sandbox Code Playgroud)

Arr*_*uff 2

任何尝试在 pyspark 3.x 中执行此操作的人都可以使用

申请在Pandas

#### For Example:

  from pyspark.sql import SparkSession, Row, DataFrame
  import pandas as pd
  spark = SparkSession.builder.master("local").getOrCreate()

  df1 = spark.createDataFrame(
      [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
      ("time", "id", "v1"))
  df2 = spark.createDataFrame(
      [(20000101, 1, "x"), (20000101, 2, "y")],
      ("time", "id", "v2"))
  def asof_join(l, r):
      return pd.merge_asof(l, r, on="time", by="id")
  df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
      asof_join, schema="time int, id int, v1 double, v2 string"
  ).show()


  >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  +--------+---+---+---+
  |    time| id| v1| v2|
  +--------+---+---+---+
  |20000101|  1|1.0|  x|
  |20000102|  1|3.0|  x|
  |20000101|  2|2.0|  y|
  |20000102|  2|4.0|  y|
  +--------+---+---+---+
Run Code Online (Sandbox Code Playgroud)