在PySpark中进行排序缩减的最有效方法是什么?

rju*_*ney 7 python mapreduce python-2.7 apache-spark pyspark

我正在分析2015年美国国内航班的准时性能记录.我需要按尾号分组,并将每个尾号的所有航班的日期分类列表存储在数据库中,以便我的应用程序检索.我不确定实现这一目标的两个选项中哪一个是最好的选择.

# Load the parquet file
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet')

# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(lambda x: 
  (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
  )
Run Code Online (Sandbox Code Playgroud)

我可以通过减少排序来实现这一目标......

# Group flights by tail number, sorted by date, then flight number, then 
origin/dest
flights_per_airplane = flights\
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
  .reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4])))
Run Code Online (Sandbox Code Playgroud)

或者我可以在随后的地图工作中实现它......

# Do same in a map step, more efficient or does pySpark know how to optimize the above?
flights_per_airplane = flights\
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
  .reduceByKey(lambda a, b: a + b)\
  .map(lambda tuple: 
    (
      tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4])))
    )
Run Code Online (Sandbox Code Playgroud)

在reduce中执行此操作似乎效率非常低,但实际上两者都非常慢.sorted()看起来像在PySpark文档中这样做的方式,所以我想知道PySpark是不是在内部使这个犹太洁食?由于某些其他原因,哪个选项最有效或最佳选择?

我的代码在这里也是一个要点:https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade

如果您对数据感到好奇,请访问交通运输统计局,网址:http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID = 236&DB_Short_Name = On-Time

zer*_*323 4

不幸的是,在开始排序之前,这两种方法都是错误的,并且 Spark 中没有有效且简单的方法来执行此操作。尽管如此,第一个比另一个要差得多。

为什么这两种方式都是错误的?因为它只是另一个groupByKey,而且只是一个昂贵的操作。有一些方法可以尝试改进(尤其是避免地图边减少),但最终您只需要付出完全洗牌的代价,如果您没有看到任何失败,那么可能不值得所有的大惊小怪。

尽管如此,第二种方法在算法上要好得多*。如果您想像第一次尝试一样始终保持排序结构,您应该使用专用工具(aggregateByKeywithbisect.insort将是一个不错的选择),但这里实际上没有任何收获。

如果分组输出是硬性要求,那么您能做的最好的事情就是keyBy排序groupByKey。它不会比第二种解决方案提高性能,但可以说会提高可读性:

(flights
    .keyBy(lambda x: x[5])
    .groupByKey()
    .mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5])))
Run Code Online (Sandbox Code Playgroud)

* 即使您假设Timsort的最佳情况,第一种方法也是 N 乘O(N) ,而在最坏情况下第二种方法是O(N log N) 。