Ken*_*evi 4 python tuples apache-spark rdd pyspark
我对 Python 中的 Apache Spark 比较陌生,这就是我正在尝试做的事情。我输入的数据如下。
rdd_row是行索引 (i) 的 RDD,rdd_col是列索引 (j) 的 RDD,rdd_values是 Values (v) 的 RDD。上面三个RDD都很大。
我正在尝试将它们转换为稀疏 rdd 矩阵
rdd_mat= ([rdd_row],[rdd_col],[rdd_values])
Run Code Online (Sandbox Code Playgroud)
IE,
rdd_mat=([i1,i2,i3 ..],[j1,j2,j3..], [v1,v2,v3 ..])
Run Code Online (Sandbox Code Playgroud)
我努力了:
zip where rdd_row.zip(rdd_col).zip(rdd_val)
Run Code Online (Sandbox Code Playgroud)
但它最终给出了
[(i1,j1,v1),(i2,j2,v2) ..]
Run Code Online (Sandbox Code Playgroud)
和
rdd1.union(rdd2)
Run Code Online (Sandbox Code Playgroud)
不会创建元组。
非常感谢帮助引导我走向正确的方向!
不幸的是,在这一点上(Spark 1.4)如果您对线性代数感兴趣,Scala 和 Java 是比 Python 更好的选择。假设您输入如下:
import numpy as np
np.random.seed(323)
rdd_row = sc.parallelize([0, 1, 1, 2, 3])
rdd_col = sc.parallelize([1, 2, 3, 4, 4])
rdd_vals = sc.parallelize(np.random.uniform(0, 1, size=5))
Run Code Online (Sandbox Code Playgroud)
要获得rdd_mat所需的形状,您可以执行以下操作:
assert rdd_row.count() == rdd_col.count() == rdd_vals.count()
rdd_mat = sc.parallelize(
(rdd_row.collect(), rdd_row.collect(), rdd_vals.collect()))
Run Code Online (Sandbox Code Playgroud)
但这是一个相当糟糕的主意。正如 @DeanLa 已经提到的,这里的并行处理非常有限,更不用说每个部分(例如整个行列表)都将最终出现在单个分区/节点上。
如果不知道您想如何使用输出,就很难给您有意义的建议,但一种方法是使用如下所示的内容:
from pyspark.mllib.linalg import Vectors
coords = (rdd_row.
zip(rdd_col).
zip(rdd_vals).
map(lambda ((row, col), val): (row, col, val)).
cache())
ncol = coords.map(lambda (row, col, val): col).distinct().count()
rows = (coords.
groupBy(lambda (row, col, val): row)
.mapValues(lambda values: Vectors.sparse(
ncol, sorted((col, val) for (row, col, val) in values))))
Run Code Online (Sandbox Code Playgroud)
它将创建一个 rdd 对,表示给定行的行索引和值的稀疏向量。如果您添加一些连接或按列添加组,您可以自己实现一些典型的线性代数例程,但对于功能齐全的分布式数据结构,最好使用 Scala / Java CoordinationMatrix或来自的其他类org.apache.spark.mllib.linalg.distributed
| 归档时间: |
|
| 查看次数: |
1585 次 |
| 最近记录: |