Can*_*ice 2 matrix-multiplication pyspark
昨天我问了一个类似的问题-Spark中两个RDD [Array [Double]]之间的矩阵乘法 -但是我决定改用pyspark做到这一点。我在加载和重新格式化数据方面取得了一些进展- 从字符串的RDD到双精度列表的RDD的Pyspark映射 -但是矩阵乘法很困难。让我先分享我的进步:
matrix1.txt
1.2 3.4 2.3
2.3 1.1 1.5
3.3 1.8 4.5
5.3 2.2 4.5
9.3 8.1 0.3
4.5 4.3 2.1
Run Code Online (Sandbox Code Playgroud)
共享文件很困难,但这就是我的matrix1.txt文件的样子。它是一个用空格分隔的文本文件,其中包含矩阵的值。接下来是代码:
# do the imports for pyspark and numpy
from pyspark import SparkConf, SparkContext
import numpy as np
# loadmatrix is a helper function used to read matrix1.txt and format
# from RDD of strings to RDD of list of floats
def loadmatrix(sc):
data = sc.textFile("matrix1.txt").map(lambda line: line.split(' ')).map(lambda line: [float(x) for x in line])
return(data)
# this is the function I am struggling with, it should take a line of the
# matrix (formatted as list of floats), compute an outer product with itself
def AtransposeA(line):
# pseudocode for this would be...
# outerprod = compute line * line^transpose
# return(outerprod)
# here is the main body of my file
if __name__ == "__main__":
# create the conf, sc objects, then use loadmatrix to read data
conf = SparkConf().setAppName('SVD').setMaster('local')
sc = SparkContext(conf = conf)
mymatrix = loadmatrix(sc)
# this is pseudocode for calling AtransposeA
ATA = mymatrix.map(lambda line: AtransposeA(line)).reduce(elementwise add all the outerproducts)
# the SVD of ATA is computed below
U, S, V = np.linalg.svd(ATA)
# ...
Run Code Online (Sandbox Code Playgroud)
我的方法如下-做矩阵乘法A ^ T * A,我创建了一个计算A行的外积的函数。所有外积的元素和是我想要的积。然后,我在map函数中调用AtransposeA(),该方法是在矩阵的每一行上执行的,最后我使用reduce()来添加结果矩阵。
我正在努力思考AtransposeA函数的外观。我该如何在pyspark中制作外部产品?在此先感谢您的帮助!
首先,考虑为什么要使用Spark此功能。听起来您的所有数据都适合存储在内存中,在这种情况下,您可以使用numpy,并pandas在一个非常直接的方式。
如果您的数据没有经过结构化,因此行是独立的,则可能无法通过将行组发送到不同的节点来并行化数据,这是使用的重点Spark。
话虽这么说...这里是一些pyspark我想做的(2.1.1)代码。
# read the matrix file
df = spark.read.csv("matrix1.txt",sep=" ",inferSchema=True)
df.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|1.2|3.4|2.3|
|2.3|1.1|1.5|
|3.3|1.8|4.5|
|5.3|2.2|4.5|
|9.3|8.1|0.3|
|4.5|4.3|2.1|
+---+---+---+
# do the sum of the multiplication that we want, and get
# one data frame for each column
colDFs = []
for c2 in df.columns:
colDFs.append( df.select( [ F.sum(df[c1]*df[c2]).alias("op_{0}".format(i)) for i,c1 in enumerate(df.columns) ] ) )
# now union those separate data frames to build the "matrix"
mtxDF = reduce(lambda a,b: a.select(a.columns).union(b.select(a.columns)), colDFs )
mtxDF.show()
+------------------+------------------+------------------+
| op_0| op_1| op_2|
+------------------+------------------+------------------+
| 152.45|118.88999999999999| 57.15|
|118.88999999999999|104.94999999999999| 38.93|
| 57.15| 38.93|52.540000000000006|
+------------------+------------------+------------------+
Run Code Online (Sandbox Code Playgroud)
这似乎与您从中得到的结果相同numpy。
a = numpy.genfromtxt("matrix1.txt")
numpy.dot(a.T, a)
array([[ 152.45, 118.89, 57.15],
[ 118.89, 104.95, 38.93],
[ 57.15, 38.93, 52.54]])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2882 次 |
| 最近记录: |