Pyspark:使用具有多种Critiria的repartitionAndSortWithinPartitions

Mpi*_*ris 4 python apache-spark pyspark

假设我有以下RDD:

rdd = sc.parallelize([('a', (5,1)), ('d', (8,2)), ('2', (6,3)), ('a', (8,2)), ('d', (9,6)), ('b', (3,4)),('c', (8,3))])
Run Code Online (Sandbox Code Playgroud)

如何使用repartitionAndSortWithinPartitionsx [0]和x [1] [0]后排序.使用以下内容我只按键(x [0])排序:

Npartitions = sc.defaultParallelism
rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: hash(x) % Npartitions, 2)
Run Code Online (Sandbox Code Playgroud)

这样做的方法如下,但我认为应该更简单:

Npartitions = sc.defaultParallelism 
partitioned_data = rdd
  .partitionBy(2)
  .map(lambda x:(x[0],x[1][0],x[1][1]))
  .toDF(['letter','number2','number3'])
  .sortWithinPartitions(['letter','number2'],ascending=False)
  .map(lambda x:(x.letter,(x.number2,x.number3)))

>>> partitioned_data.glom().collect()

[[],
[(u'd', (9, 6)), (u'd', (8, 2))],
[(u'c', (8, 3)), (u'c', (6, 3))],
[(u'b', (3, 4))],
[(u'a', (8, 2)), (u'a', (5, 1))]
Run Code Online (Sandbox Code Playgroud)

可以看出,我必须将其转换为Dataframe才能使用sortWithinPartitions.还有另外一种方法吗?用repartitionAndSortWIthinPartitions

(数据不是全局排序并不重要.我只关心在分区内进行排序.)

zer*_*323 10

这是可能的,但您必须在复合键中包含所有必需的信息:

from pyspark.rdd import portable_hash

n = 2

def partitioner(n):
    """Partition by the first item in the key tuple"""
    def partitioner_(x):
        return portable_hash(x[0]) % n
    return partitioner_


(rdd
  .keyBy(lambda kv: (kv[0], kv[1][0]))  # Create temporary composite key
  .repartitionAndSortWithinPartitions(
      numPartitions=n, partitionFunc=partitioner(n), ascending=False)
  .map(lambda x: x[1]))  # Drop key (note: there is no partitioner set anymore)
Run Code Online (Sandbox Code Playgroud)

逐步说明:

如前所述,您可以重塑数据:

rdd.map(lambda kv: ((kv[0], kv[1][0]), kv[1][1]))
Run Code Online (Sandbox Code Playgroud)

并放弃最终map以提高性能.