大型矩阵运算:Scala/Apache Spark中的乘法

PTD*_*TDS 0 scala sparse-matrix large-data matrix-multiplication apache-spark

我需要乘以两个大矩阵,X并且Y.通常X具有~500K行和~18K列,并Y具有~18K行和~18K列.X预计矩阵是稀疏的,并且矩阵Y预计是稀疏/密集的.在Scala/Apache Spark中执行此乘法的理想方法是什么?

Spi*_*Pig 10

我为你准备了一些代码.它将矩阵表示为列向量数组(这意味着数组中的每个条目都是列,而不是行).将两个1000*1000矩阵相乘需要大约0.7秒.两个10,000*10,000矩阵的11分钟.20,000*20,000的1.5小时和(500k*18k)的18小时(18k*18k).但是如果你并行运行它(通过使用已注释掉的代码),它应该运行快2到3倍(在4核心cpu上).但请记住,第一个矩阵中的列数始终必须与第二个矩阵中的行数相同.

class Matrix(val columnVectors: Array[Array[Double]]) {
  val columns = columnVectors.size
  val rows = columnVectors.head.size
  def *(v: Array[Double]): Array[Double] = {
    val newValues = Array.ofDim[Double](rows)
    var col = 0
    while(col < columns) {
      val n = v(col)
      val column = columnVectors(col)
      var row = 0
      while(row < newValues.size) {
        newValues(row) += column(row) * n
        row += 1
      }
      col += 1
    }
    newValues
  }
  def *(other: Matrix): Matrix = {
    //do the calculation on only one cpu
    new Matrix(other.columnVectors.map(col => this * col))

    //do the calculation in parallel on all available cpus
    //new Matrix(other.columnVectors.par.map(col => this * col).toArray)
  }
  override def toString = {
    columnVectors.transpose.map(_.mkString(", ")).mkString("\n")
  }
}
Run Code Online (Sandbox Code Playgroud)

编辑:

好的,这是一个更好的版本.我现在将行向量存储在矩阵中而不是列向量中.这使得在第一矩阵稀疏的情况下更容易优化乘法.我还使用迭代器添加了一个惰性版本的矩阵乘法.由于第一个矩阵是500k*18k = 90亿个数字,这样的懒惰版本将允许你进行乘法而不需要太多的ram.您只需要创建一个Iterator,它可以懒散地读取行,例如从数据库读取,然后从生成的迭代器中写入行.

import scala.collection.Iterator
import scala.util.{Random => rand}

def time[T](descr: String)(f: => T): T = {
  val start = System.nanoTime
  val r = f
  val end = System.nanoTime
  val time = (end - start)/1e6
  println(descr + ": time = " + time + "ms")
  r
}

object Matrix {
  def mulLazy(m1: Iterator[Array[Double]], m2: Matrix): Iterator[Array[Double]] = {
    m1.grouped(8).map { group =>
      group.par.map(m2.mulRow).toIterator
    }.flatten
  }
}

class Matrix(val rowVectors: Array[Array[Double]]) {
  val columns = rowVectors.head.size
  val rows = rowVectors.size

  private def mulRow(otherRow: Array[Double]): Array[Double] = {
    val rowVectors = this.rowVectors
    val result = Array.ofDim[Double](columns)
    var i = 0
    while(i < otherRow.size) {
      val value = otherRow(i)
      if(value != 0) { //optimization for sparse matrix
        val row = rowVectors(i)
        var col = 0
        while(col < result.size) {
          result(col) += value * row(col)
          col += 1
        }
      }
      i += 1
    }
    result
  }

  def *(other: Matrix): Matrix = {
    new Matrix(rowVectors.par.map(other.mulRow).toArray)
  }

  def equals(other: Matrix): Boolean = {
    java.util.Arrays.deepEquals(this.rowVectors.asInstanceOf[Array[Object]], other.rowVectors.asInstanceOf[Array[Object]])
  }

  override def equals(other: Any): Boolean = {
    if(other.isInstanceOf[Matrix]) equals(other.asInstanceOf[Matrix]) else false
  }

  override def toString = {
    rowVectors.map(_.mkString(", ")).mkString("\n")
  }
}

def randMatrix(rows: Int, columns: Int): Matrix = {  
  new Matrix((1 to rows).map(_ => Array.fill(columns)(rand.nextDouble * 100)).toArray)
}

def sparseRandMatrix(rows: Int, columns: Int, ratio: Double): Matrix = {
  new Matrix((1 to rows).map(_ => Array.fill(columns)(if(rand.nextDouble > ratio) 0 else rand.nextDouble * 100)).toArray)
}

val N = 2000

val m1 = sparseRandMatrix(N, N, 0.1) // only 10% of the numbers will be different from 0
val m2 = randMatrix(N, N)

val m3 = m1.rowVectors.toIterator

val m12 = time("m1 * m2")(m1 * m2)
val m32 = time("m3 * m2")(Matrix.mulLazy(m3, m2)) //doesn't take much time because the matrix multiplication is lazy

println(m32)

println("m12 == m32 = " + (new Matrix(m32.toArray) == m12))
Run Code Online (Sandbox Code Playgroud)