在python中按时联接两个Spark数据帧(TimestampType)

Ole*_*siy 5 join apache-spark apache-spark-sql pyspark

我有两个数据框,我想基于一个列将它们连接起来,但需要注意的是,该列是一个时间戳,并且该时间戳必须在一定的偏移量(5秒)内才能连接记录。更具体地讲,dates_dfwith中的记录date=1/3/2015:00:00:00应该与events_dfwith 一起加入,time=1/3/2015:00:00:01因为两个时间戳都在5秒钟之内。

我正在尝试让这种逻辑与python spark一起使用,这非常痛苦。人们如何像这样在火花中加入?

我的方法是向其添加两个额外的列,以5秒的偏移量dates_df确定lower_timestampupper_timestamp边界,并执行条件连接。这就是失败的地方,更具体地说:

joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)

joined_df.explain()
Run Code Online (Sandbox Code Playgroud)

仅捕获查询的最后一部分:

Filter (time#6 < upper_timestamp#4)
 CartesianProduct
 ....
Run Code Online (Sandbox Code Playgroud)

这给我一个错误的结果。

我是否真的需要对每个不等式进行完全笛卡尔联接,并在进行过程中删除重复项?

这是完整的代码:

from datetime import datetime, timedelta

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf


master = 'local[*]'
app_name = 'stackoverflow_join'

conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def lower_range_func(x, offset=5):
    return x - timedelta(seconds=offset)

def upper_range_func(x, offset=5):
    return x + timedelta(seconds=offset)


lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())

dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)

dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)

dates_df.show()

# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
           withColumn('upper_timestamp', upper_range(dates_df['date']))


event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)

events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)

events_df.show()

# finally, join the data
joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)    

joined_df.show()
Run Code Online (Sandbox Code Playgroud)

我得到以下输出:

+-----+--------------------+
| name|                date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+

+--------------------+-------+
|                time|  event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+


+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name|                date|     lower_timestamp|     upper_timestamp|                time|  event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
Run Code Online (Sandbox Code Playgroud)

Ole*_*siy 6

我确实使用了SQL查询explain()来查看它是如何完成的,并在python中复制了相同的行为。首先,这里是如何使用SQL Spark进行操作:

dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and  events.time < dates.upper_timestamp")
results.explain()
Run Code Online (Sandbox Code Playgroud)

这可行,但是问题是关于如何在python中执行此操作,因此解决方案似乎只是普通联接,然后是两个过滤器:

joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)
Run Code Online (Sandbox Code Playgroud)

joined_df.explain()产生与sql spark相同的查询,results.explain()因此我假设这是完成的方式。