我只是想知道Apache Spark中的RDD和DataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?
你能把一个转换成另一个吗?
根据Learning Spark的说法
请记住,重新分区数据是一项相当昂贵的操作.Spark还有一个优化版本的repartition(),称为coalesce(),它允许避免数据移动,但前提是你减少了RDD分区的数量.
我得到的一个区别是,使用repartition()可以增加/减少分区数量,但是使用coalesce()时,只能减少分区数量.
如果分区分布在多台机器上并运行coalesce(),它如何避免数据移动?
在RDD持久性方面,spark cache()和persist()spark 之间有什么区别?
我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.
有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6,每个核心都有一些东西需要处理.
Scala代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = { …Run Code Online (Sandbox Code Playgroud) 当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?或者默认情况下RDD数据是以分布式方式存储在内存中的吗?
val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)
根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在节点的所有/部分内存中使用.
如果是这样,为什么我们需要在textFile RDD上调用"cache"或"persist"呢?
如何将RDD(org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])转换为Dataframe org.apache.spark.sql.DataFrame.我使用了将数据帧转换为rdd .rdd.处理完之后我想把它放回到数据帧中.我怎样才能做到这一点 ?
RDD map和mapPartitions方法有什么区别?并且flatMap表现得像map或喜欢mapPartitions?谢谢.
(编辑)即,两者之间的差异(在语义上或在执行方面)
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
Run Code Online (Sandbox Code Playgroud)
和:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
Run Code Online (Sandbox Code Playgroud) 我读了一下文档HashPartitioner.不幸的是,除了API调用之外没有解释太多.我假设HashPartitioner根据键的哈希对分布式集进行分区.例如,如果我的数据是这样的
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
Run Code Online (Sandbox Code Playgroud)
因此,分区器会将其放入不同的分区,同一个键落在同一个分区中.但是我不明白构造函数参数的意义
new HashPartitoner(numPartitions) //What does numPartitions do?
Run Code Online (Sandbox Code Playgroud)
对于上述数据集,如果我这样做,结果会有何不同
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
Run Code Online (Sandbox Code Playgroud)
那么HashPartitioner工作怎么样呢?
在星火研究论文已规定了新的分布式编程模型,相比于传统的Hadoop MapReduce的,声称在许多情况下,特别是机器学习的简化和广阔的性能提升.但是,材料揭开internal mechanics上Resilient Distributed Datasets有Directed Acyclic Graph似乎缺乏本文.
通过调查源代码可以更好地学习吗?