我需要将RDD分成两部分:
满足条件的1部分; 另一部分没有.我可以filter在原始RDD上做两次但看起来效率低下.有没有办法可以做我想要的事情?我在API和文献中都找不到任何东西.
我是一个新手,我希望在源数据框架下转换(从JSON文件加载):
+--+-----+-----+
|A |count|major|
+--+-----+-----+
| a| 1| m1|
| a| 1| m2|
| a| 2| m3|
| a| 3| m4|
| b| 4| m1|
| b| 1| m2|
| b| 2| m3|
| c| 3| m1|
| c| 4| m3|
| c| 5| m4|
| d| 6| m1|
| d| 1| m2|
| d| 2| m3|
| d| 3| m4|
| d| 4| m5|
| e| 4| m1|
| e| 5| m2|
| e| 1| …Run Code Online (Sandbox Code Playgroud) 所以假设我有一个3000行的rdd.2000个第一行是1类,最后1000行是class2.RDD分区为100个分区.
打电话的时候 RDD.randomSplit(0.8,0.2)
该功能是否也会改变rdd?我们的分裂只是连续20%的rdd样品?或者它是随机选择20%的分区?
理想情况下,生成的拆分与原始RDD具有相同的类分布.(即2:1)
谢谢
当我执行以下命令时:
scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist()
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at <console>:22
scala> rdd.partitions.size
res9: Int = 10
scala> rdd.partitioner.isDefined
res10: Boolean = true
scala> rdd.partitioner.get
res11: org.apache.spark.Partitioner = org.apache.spark.HashPartitioner@a
Run Code Online (Sandbox Code Playgroud)
它说有10个分区,分区完成使用HashPartitioner.但是当我执行以下命令时:
scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4)
...
scala> rdd.partitions.size
res6: Int = 4
scala> rdd.partitioner.isDefined
res8: Boolean = false
Run Code Online (Sandbox Code Playgroud)
它说有4个分区,并且没有定义分区器.那么,什么是Spark中的默认分区方案?/如何在第二种情况下对数据进行分区?
任何人都可以纠正我对Spark坚持的理解.
如果我们在RDD上执行了cache(),则它的值仅缓存在最初计算RDD的那些节点上.含义,如果存在100个节点的集群,则在第一个和第二个节点的分区中计算RDD.如果我们缓存了这个RDD,那么Spark将仅在第一个或第二个工作节点中缓存它的值.因此,当此Spark应用程序尝试在后续阶段使用此RDD时,Spark驱动程序必须从第一个/第二个节点获取值.
我对么?
(要么)
是RDD值持久存储在驱动程序内存而不是节点上的东西吗?
我有这样的RDD:
1 2 3
4 5 6
7 8 9
Run Code Online (Sandbox Code Playgroud)
这是一个矩阵.现在我想像这样转置RDD:
1 4 7
2 5 8
3 6 9
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?
你将如何使用python在Spark中执行基本连接?在R中你可以使用merg()来做到这一点.使用python on spark的语法是什么:
使用两个表(RDD),每个表中都有一个具有公共密钥的列.
RDD(1):(key,U)
RDD(2):(key,V)
Run Code Online (Sandbox Code Playgroud)
我认为内部联接是这样的:
rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
Run Code Online (Sandbox Code Playgroud)
是对的吗?我在互联网上搜索过,无法找到一个很好的连接示例.提前致谢.
所以,据我所知,一般情况下应该使用coalesce():
由于某个
filter或其他操作可能导致减少原始数据集(RDD,DF),分区数量减少.coalesce()过滤大型数据集后,可以更有效地运行操作.
我也明白它比repartition通过仅在必要时移动数据来减少混乱更便宜.我的问题是如何定义coalesce带(idealPartionionNo)的参数.我正在研究一个项目,该项目是从另一位工程师传递给我的,他使用下面的计算来计算该参数的值.
// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)
val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR
Run Code Online (Sandbox Code Playgroud)
然后将其与partitioner对象一起使用:
val partitioner = new HashPartitioner(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)
但也用于:
RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)
这是正确的方法吗?idealPartionionNo价值计算背后的主要思想是什么?什么是REPARTITION_FACTOR?我一般如何定义它?
此外,由于纱线负责确定对飞可用执行人有获得该号(的方式AVAILABLE_EXECUTOR_INSTANCES在运行),并利用它来进行计算idealPartionionNo(如更换NO_OF_EXECUTOR_INSTANCES用AVAILABLE_EXECUTOR_INSTANCES)?
理想情况下,表单的一些实际示例:
n 执行程序,其m 核心和分区因子 …我在HDFS上有一个文本文件,我想将它转换为Spark中的数据框.
我使用Spark Context加载文件,然后尝试从该文件生成单个列.
val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))
Run Code Online (Sandbox Code Playgroud)
执行此操作后,我正在尝试以下操作.
myFile1.toDF()
Run Code Online (Sandbox Code Playgroud)
我遇到了问题,因为myFile1 RDD中的元素现在是数组类型.
我该如何解决这个问题?
我有一个RDD结构
RDD[(String, String)]
Run Code Online (Sandbox Code Playgroud)
我想创建2个列表(rdd的每个维度一个).
我尝试使用rdd.foreach()并填充两个ListBuffers然后将它们转换为Lists,但我猜每个节点都创建自己的ListBuffer,因为在迭代之后BufferLists是空的.我该怎么做 ?
编辑:我的方法
val labeled = data_labeled.map { line =>
val parts = line.split(',')
(parts(5), parts(7))
}.cache()
var testList : ListBuffer[String] = new ListBuffer()
labeled.foreach(line =>
testList += line._1
)
val labeledList = testList.toList
println("rdd: " + labeled.count)
println("bufferList: " + testList.size)
println("list: " + labeledList.size)
Run Code Online (Sandbox Code Playgroud)
结果是:
rdd: 31990654
bufferList: 0
list: 0
Run Code Online (Sandbox Code Playgroud) apache-spark ×10
rdd ×10
scala ×4
dataframe ×2
python ×2
join ×1
list ×1
partitioning ×1
pyspark ×1