pyspark,比较数据帧中的两行

ivy*_*wit 4 python apache-spark apache-spark-sql pyspark pyspark-sql

我试图将数据帧中的一行与下一行进行比较以查看时间戳的差异.目前的数据如下:

 itemid | eventid | timestamp
 ----------------------------
 134    | 30      | 2016-07-02 12:01:40
 134    | 32      | 2016-07-02 12:21:23
 125    | 30      | 2016-07-02 13:22:56
 125    | 32      | 2016-07-02 13:27:07
Run Code Online (Sandbox Code Playgroud)

我已经尝试将一个函数映射到数据帧上,以便进行这样的比较:(注意:我正在尝试获得差异大于4小时的行)

items = df.limit(10)\
          .orderBy('itemid', desc('stamp'))\
          .map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()
Run Code Online (Sandbox Code Playgroud)

但是我收到以下错误:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe
Run Code Online (Sandbox Code Playgroud)

我认为这是由于我错误地使用了map功能.帮助使用地图或不同的解决方案将不胜感激.

更新: @ zero323的答案提供了有关我不正确使用映射的信息,但是我使用的系统在2.02之前运行Spark版本并且我正在使用Cassandra中的数据.

我设法用mapPartitions解决它.请参阅下面的答案.

更新(2017/03/27): 由于最初标记了这篇文章的答案,我对Spark的理解有了显着改善.我在下面更新了我的答案,以显示我当前的解决方案.

zer*_*323 9

是的,你正在map以错误的方式使用功能.map当时对单个元素进行操作.你可以尝试使用这样的窗口函数:

from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

df = (
    sc.parallelize([
        (134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
        (125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
    ]).toDF(["itemid", "eventid", "timestamp"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
)

w = Window.partitionBy("itemid").orderBy("timestamp")

diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")

df.withColumn("diff", diff)
Run Code Online (Sandbox Code Playgroud)


ivy*_*wit -1

@ShuaiYuan 对原答案的评论是正确的。在过去的一年里,我对 Spark 的工作原理有了更好的理解,并且实际上重写了我为这篇文章编写的程序。

新答案 (2017/03/27)
为了完成比较数据帧的两行,我最终使用了 RDD。我按键(在本例中为项目 id)对数据进行分组,并忽略 eventid,因为它与此等式无关。然后,我将 lambda 函数映射到行上,返回键的元组和包含事件间隙开始和结束的元组列表,该列表源自“findGaps”函数,该函数迭代链接的值列表(排序的时间戳)到每个键。完成后,我会过滤掉没有时间间隔的键,然后使用 flatMapValues 将数据返回为更像 sql 的格式。这是通过以下代码完成的:

# Find time gaps in list of datetimes where firings are longer than given duration.  
def findGaps(dates, duration):
    result = []
    length = len(dates)

    # convert to dates for comparison
    first = toDate(dates[0])
    last = toDate(dates[length - 1])
    for index, item in enumerate(dates):
        if index < length -1 and (dates[index + 1] - item).total_seconds() > duration:
            # build outage tuple and append to list
            # format (start, stop, duration)
            result.append(formatResult(item, dates[index + 1], kind))
    return result

outage_list = outage_join_df.rdd\
                            .groupByKey()\
                            .map(lambda row: (
                                     row[0],
                                     findGaps(
                                         sorted(list(row[1])), 
                                         limit
                                     )
                                  )
                            )\
                            .filter(lambda row: len(row[1]) > 0)\
                            .flatMapValues(lambda row: row)\
                            .map(lambda row: (
                                 row[0]['itemid'],     # itemid
                                 row[1][0].date(),     # date
                                 row[1][0],            # start
                                 row[1][1],            # stop
                                 row[1][2]             # duration
                            ))\
                            .collect()
Run Code Online (Sandbox Code Playgroud)

原始答案(错误)
我设法使用mapPartitions解决它:

def findOutage(items):
    outages = []

    lastStamp = None
    for item in items:
        if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
            outages.append({"item": item.itemid, 
                            "start": item.stamp.isoformat(),
                            "stop": lastStamp.isoformat()})
        lastStamp = item.stamp
    return iter(outages)

items = df.limit(10).orderBy('itemid', desc('stamp'))

outages = items.mapPartitions(findOutage).collect()
Run Code Online (Sandbox Code Playgroud)

感谢大家的帮助!