标签: rdd

Spark中的DataFrame,Dataset和RDD之间的区别

我只是想知道Apache Spark中的RDDDataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?

你能把一个转换成另一个吗?

apache-spark rdd apache-spark-sql apache-spark-dataset

228
推荐指数
10
解决办法
10万
查看次数

Spark - repartition()vs coalesce()

根据Learning Spark的说法

请记住,重新分区数据是一项相当昂贵的操作.Spark还有一个优化版本的repartition(),称为coalesce(),它允许避免数据移动,但前提是你减少了RDD分区的数量.

我得到的一个区别是,使用repartition()可以增加/减少分区数量,但是使用coalesce()时,只能减少分区数量.

如果分区分布在多台机器上并运行coalesce(),它如何避免数据移动?

distributed-computing apache-spark rdd

208
推荐指数
13
解决办法
15万
查看次数

缓存和持久有什么区别?

RDD持久性方面,spark cache()persist()spark 之间有什么区别?

distributed-computing apache-spark rdd

197
推荐指数
6
解决办法
9万
查看次数

Scala与Python的Spark性能

我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.

有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6,每个核心都有一些东西需要处理.

Scala代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = { …
Run Code Online (Sandbox Code Playgroud)

performance scala apache-spark rdd pyspark

166
推荐指数
1
解决办法
4万
查看次数

(为什么)我们需要在RDD上调用缓存或持久化

当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?或者默认情况下RDD数据是以分布式方式存储在内存中的吗?

val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)

根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在节点的所有/部分内存中使用.

如果是这样,为什么我们需要在textFile RDD上调用"cache"或"persist"呢?

scala apache-spark rdd

161
推荐指数
5
解决办法
7万
查看次数

如何将rdd对象转换为spark中的dataframe

如何将RDD(org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])转换为Dataframe org.apache.spark.sql.DataFrame.我使用了将数据帧转换为rdd .rdd.处理完之后我想把它放回到数据帧中.我怎样才能做到这一点 ?

scala apache-spark rdd apache-spark-sql

128
推荐指数
7
解决办法
24万
查看次数

Apache Spark:map vs mapPartitions?

RDD mapmapPartitions方法有什么区别?并且flatMap表现得像map或喜欢mapPartitions?谢谢.

(编辑)即,两者之间的差异(在语义上或在执行方面)

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }
Run Code Online (Sandbox Code Playgroud)

和:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Run Code Online (Sandbox Code Playgroud)

performance scala apache-spark rdd

119
推荐指数
3
解决办法
10万
查看次数

79
推荐指数
1
解决办法
2万
查看次数

HashPartitioner如何运作?

我读了一下文档HashPartitioner.不幸的是,除了API调用之外没有解释太多.我假设HashPartitioner根据键的哈希对分布式集进行分区.例如,如果我的数据是这样的

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
Run Code Online (Sandbox Code Playgroud)

因此,分区器会将其放入不同的分区,同一个键落在同一个分区中.但是我不明白构造函数参数的意义

new HashPartitoner(numPartitions) //What does numPartitions do?
Run Code Online (Sandbox Code Playgroud)

对于上述数据集,如果我这样做,结果会有何不同

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
Run Code Online (Sandbox Code Playgroud)

那么HashPartitioner工作怎么样呢?

scala partitioning apache-spark rdd

77
推荐指数
2
解决办法
3万
查看次数

DAG如何在RDD的幕后工作?

星火研究论文已规定了新的分布式编程模型,相比于传统的Hadoop MapReduce的,声称在许多情况下,特别是机器学习的简化和广阔的性能提升.但是,材料揭开internal mechanicsResilient Distributed DatasetsDirected Acyclic Graph似乎缺乏本文.

通过调查源代码可以更好地学习吗?

directed-acyclic-graphs apache-spark rdd

60
推荐指数
2
解决办法
4万
查看次数