在Spark中将循环分发到集群的不同计算机

CMW*_*siq -1 scala distributed-computing apache-spark

这是我在我的代码中运行的for循环:

 for(x<-0 to vertexArray.length-1)
  {
    for(y<-0 to vertexArray.length-1)
      {
        breakable {

          if (x.equals(y)) {
            break
          }
          else {
            var d1 = vertexArray(x)._2._2
            var d2 = vertexArray(y)._2._2
            val ps = new Period(d1, d2)

            if (ps.getMonths() == 0 && ps.getYears() == 0 && Math.abs(ps.toStandardHours().getHours()) <= 5) {
              edgeArray += Edge(vertexArray(x)._1, vertexArray(y)._1, Math.abs(ps.toStandardHours().getHours()))
            }
          }
        }
      }
  }
Run Code Online (Sandbox Code Playgroud)

我希望通过在群集中的多台计算机上分发此代码来加快此代码的运行时间.我在Spark上使用Scala on intelliJ-idea.我如何实现这种类型的代码在多台机器上工作?

zer*_*323 5

正如Mariano Kamp Spark 所说,这里可能不是一个好选择,那里有更好的选择.除此之外,任何必须处理相对较大的数据并且需要O(N ^ 2)时间的方法都是不可接受的.所以你要做的第一件事就是专注于选择合适的算法而不是平台.

仍然可以将其翻译为Spark.一种直接反映您的代码的天真方法是使用笛卡尔积:

def check(v1: T, v2: T): Option[U] = {
  if (v1 == v2) {
    None
  } else {
    // rest of your logic, Some[U] if all tests passed
    // None otherwise
    ???
  }
}

val vertexRDD = sc.parallelize(vertexArray)
  .map{case (v1, v2) => check(v1, 2)}
  .filter(_.isDefined)
  .map(_.get)
Run Code Online (Sandbox Code Playgroud)

如果vertexArray很小,你可以使用flatMap广播变量

val vertexBd = sc.broadcast(vertexArray)

vertexRDD.flatMap(v1 =>
  vertexBd.map(v2 => check(v1, v2)).filter(_.isDefined).map(_.get))
)
Run Code Online (Sandbox Code Playgroud)

另一个改进是执行适当的连接.显而易见的条件是年月:

def toPair(v: T): ((Int, Int), T) = ??? // Return ((year, month), vertex)

val vertexPairs = vertexRDD.map(toPair)

vertexPairs.join(vertexPairs)
  .map{case ((_, _), (v1, v2)) => check(v1, v2) // Check should be simplified
  .filter(_.isDefined)
  .map(_.get)
Run Code Online (Sandbox Code Playgroud)

当然,这也可以通过广播变量来实现.你只需要分组vertexArray(年,月)对和广播Map[(Int, Int), T].

从这里开始,您可以通过分区进行天真检查并遍历按时间戳排序的数据来进一步改进:

def sortPartitionByDatetime(iter: Iterator[U]): Iterator[U] = ???
def yieldMatching(iter: Iterator[U]): Iterator[V] = {
  // flatmap keeping track of values in open window
  ???
}

vertexPairs
  .partitionBy(new HashPartitioner(n))
  .mapPartitions(sortPartitionByDatetime)
  .mapPartitions(yieldMatching)
Run Code Online (Sandbox Code Playgroud)

或者使用带窗口函数和范围子句的DataFrame.

注意:

所有类型都只是占位符.将来请尝试提供类型信息.现在,我只能说有一些元组和日期