sum*_*tsu 2 distributed-computing apache-spark rdd pyspark apache-spark-mllib
假设存在类似于以下元组的 RDD:
(key1, 1)
(key3, 9)
(key2, 3)
(key1, 4)
(key1, 5)
(key3, 2)
(key2, 7)
...
Run Code Online (Sandbox Code Playgroud)
计算与每个键对应的统计信息的最有效(并且理想情况下是分布式)方法是什么?(目前,我希望计算标准偏差/方差,特别是。)据我所知,我的选择是:
colStatsMLLib 中的函数:mllib.stat如果认为需要其他统计计算,这种方法的优点是易于适应以后使用其他函数。但是,它在Vector包含每列数据的 RDD 上运行,据我所知,这种方法需要在单个节点上收集每个键的完整值集,这对于大数据来说似乎并不理想套。Spark 是否Vector总是暗示数据在Vector本地驻留在单个节点上?groupByKey, then stats:由于groupByKey操作的结果,可能是重洗牌。aggregateByKey,初始化一个 new StatCounter,并StatCounter::merge用作序列和组合器函数:这是StackOverflow answer 推荐的方法,避免了groupByKeyfrom 选项 2。但是,我StatCounter在 PySpark 中找不到好的文档。我喜欢选项 1,因为它使代码更具可扩展性,因为它可以使用其他具有类似合约的 MLLib 函数轻松适应更复杂的计算,但是如果Vector输入本身要求在本地收集数据集,那么它会限制数据大小哪些代码可以有效运行。在其他两个之间,选项 3看起来更有效,因为它避免了groupByKey,但我希望确认情况确实如此。
还有其他我没有考虑过的选择吗?(我目前正在使用 Python + PySpark,但如果存在语言差异,我也对 Java/Scala 中的解决方案持开放态度。)
你可以试试reduceByKey。如果我们只想计算min():
rdd.reduceByKey(lambda x,y: min(x,y)).collect()
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)]
Run Code Online (Sandbox Code Playgroud)
要计算mean,您首先需要创建(value, 1)我们用来计算sum和操作count中的元组reduceByKey。最后,我们将它们相互除以得出mean:
meanRDD = (rdd
.mapValues(lambda x: (x, 1))
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
.mapValues(lambda x: x[0]/x[1]))
meanRDD.collect()
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)]
Run Code Online (Sandbox Code Playgroud)
对于variance,您可以使用公式(sumOfSquares/count) - (sum/count)^2,我们按以下方式翻译:
varRDD = (rdd
.mapValues(lambda x: (1, x, x*x))
.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))
.mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2)))
varRDD.collect()
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)]
Run Code Online (Sandbox Code Playgroud)
我使用 type 值double而不是int在虚拟数据中来准确说明计算平均值和方差:
rdd = sc.parallelize([("key1", 1.0),
("key3", 9.0),
("key2", 3.0),
("key1", 4.0),
("key1", 5.0),
("key3", 2.0),
("key2", 7.0)])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4148 次 |
| 最近记录: |