在Spark中对可迭代值进行排序

Joe*_*Joe 4 apache-spark

假设我有以下输入数据:

["example.com", Date(2000, 1, 1)] : 100,
["example.com", Date(2000, 2, 1)]: 30,
["example.com", Date(2000, 3, 1)]: 5, 
["xyz.com", Date(2000, 1, 1)]: 20,
["xyz.com", Date(2000, 2, 1)]: 10,
["xyz.com", Date(2000, 3, 1)]: 60]
Run Code Online (Sandbox Code Playgroud)

我想按日期(降序)分组,然后按计数排序,从而给我每个日期的域的有序列表。

我要结束于:

Date(2000, 1, 1), [["example.com", 100], ["xyz.com", 20]]
Date(2000, 2, 1), [["example.com", 30], ["xyz.com", 10]]
Date(2000, 3, 1), [["xyz.com", 60], ["example.com", 5]]
Run Code Online (Sandbox Code Playgroud)

看来这是一个正常的用例,但是我在编程指南中看不到这样做的方法。

我可以 map [[domain, date] count] -> [date, [domain, count]]

这会给我(K, V)

Date(2000, 1, 1), ["example.com", 100],
Date(2000, 2, 1), ["example.com", 30],
Date(2000, 3, 1), ["example.com", 5], 
Date(2000, 1, 1), ["xyz.com", 20],
Date(2000, 2, 1), ["xyz.com", 10],
Date(2000, 3, 1), ["xyz.com", 60]
Run Code Online (Sandbox Code Playgroud)

然后groupByKey,给我(K, Iterable<V>)

[Date(2000, 1, 1), [["example.com", 100], ["xyz.com", 20]]
[Date(2000, 2, 1), [["example.com", 30], ["xyz.com", 10]]
[Date(2000, 3, 1), [["example.com", 5], ["xyz.com", 60]]
Run Code Online (Sandbox Code Playgroud)

然后如何在键中排序?

请原谅伪代码,我使用的是Flambo Clojure包装器,我不想为了问这个问题而用Java重写它!

编辑:每个Iterable(即域列表)可能会太大而无法容纳在内存中。

EDIT2:这就是所有伪代码。我使用月份名称来使其易于阅读,但为了清楚起见,我将其更改为实际日期。

Sea*_*wen 5

概括地说,我将执行以下操作。(由于我没有编译它,所以可能不是100%正确,而是关闭了。)为简单起见,我假设您以开头RDD[((String,String),Int)]

首先,groupBy一个月,类似于:

.groupBy { case ((_, month), _) => month }
Run Code Online (Sandbox Code Playgroud)

并删除值中的月份:

.mapValues(_.map { case ((domain, _), count) => (domain, count) })
Run Code Online (Sandbox Code Playgroud)

如果需要按月订购,请定义月订购:

def monthOfYear(month: String): Int = 
  month match {
     case "January" => 1
     case "February" => 2
     ...
  }
Run Code Online (Sandbox Code Playgroud)

并按月份对RDD进行排序:

.sortBy { case (month, _) => monthOfYear(month) }
Run Code Online (Sandbox Code Playgroud)

并按降序对域进行排序:

.mapValues(_.toSeq.sortBy{ case (domain, count) => count }(Ordering[Int].reverse))
Run Code Online (Sandbox Code Playgroud)

这是直接且高效的,但存在一个问题,即一个月内的所有域计数对都必须适合内存。

取而代之的是,您可以按照递减的顺序重新开始:

.sortBy(p => p._2, false)
Run Code Online (Sandbox Code Playgroud)

然后按月分组。我尚未对此进行测试,并且我认为这种行为不能得到保证,但是我希望在实践中,即使在分组后也将按计数顺序遇到元素。