标签: rdd

Apache Spark RDD过滤成两个RDD

我需要将RDD分成两部分:

满足条件的1部分; 另一部分没有.我可以filter在原始RDD上做两次但看起来效率低下.有没有办法可以做我想要的事情?我在API和文献中都找不到任何东西.

apache-spark rdd

18
推荐指数
2
解决办法
8097
查看次数

Spark数据帧将多行转换为列

我是一个新手,我希望在源数据框架转换(从JSON文件加载):

+--+-----+-----+
|A |count|major|
+--+-----+-----+
| a|    1|   m1|
| a|    1|   m2|
| a|    2|   m3|
| a|    3|   m4|
| b|    4|   m1|
| b|    1|   m2|
| b|    2|   m3|
| c|    3|   m1|
| c|    4|   m3|
| c|    5|   m4|
| d|    6|   m1|
| d|    1|   m2|
| d|    2|   m3|
| d|    3|   m4|
| d|    4|   m5|
| e|    4|   m1|
| e|    5|   m2|
| e|    1| …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark rdd apache-spark-sql

18
推荐指数
2
解决办法
1万
查看次数

Sparks RDD.randomSplit如何实际拆分RDD

所以假设我有一个3000行的rdd.2000个第一行是1类,最后1000行是class2.RDD分区为100个分区.

打电话的时候 RDD.randomSplit(0.8,0.2)

该功能是否也会改变rdd?我们的分裂只是连续20%的rdd样品?或者它是随机选择20%的分区?

理想情况下,生成的拆分与原始RDD具有相同的类分布.(即2:1)

谢谢

apache-spark rdd

17
推荐指数
1
解决办法
9993
查看次数

Spark中的默认分区方案

当我执行以下命令时:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist()
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at <console>:22

scala> rdd.partitions.size
res9: Int = 10

scala> rdd.partitioner.isDefined
res10: Boolean = true


scala> rdd.partitioner.get
res11: org.apache.spark.Partitioner = org.apache.spark.HashPartitioner@a
Run Code Online (Sandbox Code Playgroud)

它说有10个分区,分区完成使用HashPartitioner.但是当我执行以下命令时:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4)
...
scala> rdd.partitions.size
res6: Int = 4
scala> rdd.partitioner.isDefined
res8: Boolean = false
Run Code Online (Sandbox Code Playgroud)

它说有4个分区,并且没有定义分区器.那么,什么是Spark中的默认分区方案?/如何在第二种情况下对数据进行分区?

partitioning apache-spark rdd

17
推荐指数
1
解决办法
5176
查看次数

了解缓存,坚持使用Spark

任何人都可以纠正我对Spark坚持的理解.

如果我们在RDD上执行了cache(),则它的值仅缓存在最初计算RDD的那些节点上.含义,如果存在100个节点的集群,则在第一个和第二个节点的分区中计算RDD.如果我们缓存了这个RDD,那么Spark将仅在第一个或第二个工作节点中缓存它的值.因此,当此Spark应用程序尝试在后续阶段使用此RDD时,Spark驱动程序必须从第一个/第二个节点获取值.

我对么?

(要么)

是RDD值持久存储在驱动程序内存而不是节点上的东西吗?

apache-spark rdd apache-spark-sql

17
推荐指数
1
解决办法
8487
查看次数

如何在Spark中转置RDD

我有这样的RDD:

1 2 3
4 5 6
7 8 9
Run Code Online (Sandbox Code Playgroud)

这是一个矩阵.现在我想像这样转置RDD:

1 4 7
2 5 8
3 6 9
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?

scala apache-spark rdd

16
推荐指数
3
解决办法
1万
查看次数

如何使用Python在Spark中执行两个RDD表的基本连接?

你将如何使用python在Spark中执行基本连接?在R中你可以使用merg()来做到这一点.使用python on spark的语法是什么:

  1. 内部联接
  2. 左外连接
  3. 交叉加入

使用两个表(RDD),每个表中都有一个具有公共密钥的列.

RDD(1):(key,U)
RDD(2):(key,V)
Run Code Online (Sandbox Code Playgroud)

我认为内部联接是这样的:

rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
Run Code Online (Sandbox Code Playgroud)

是对的吗?我在互联网上搜索过,无法找到一个很好的连接示例.提前致谢.

python join apache-spark rdd pyspark

16
推荐指数
1
解决办法
4万
查看次数

如何计算合并的最佳numberOfPartitions?

所以,据我所知,一般情况下应该使用coalesce():

由于某个filter或其他操作可能导致减少原始数据集(RDD,DF),分区数量减少.coalesce()过滤大型数据集后,可以更有效地运行操作.

我也明白它比repartition通过仅在必要时移动数据来减少混乱更便宜.我的问题是如何定义coalesce带(idealPartionionNo)的参数.我正在研究一个项目,该项目是从另一位工程师传递给我的,他使用下面的计算来计算该参数的值.

// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)

val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR
Run Code Online (Sandbox Code Playgroud)

然后将其与partitioner对象一起使用:

val partitioner = new HashPartitioner(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)

但也用于:

RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)

这是正确的方法吗?idealPartionionNo价值计算背后的主要思想是什么?什么是REPARTITION_FACTOR?我一般如何定义它?

此外,由于纱线负责确定对飞可用执行人有获得该号(的方式AVAILABLE_EXECUTOR_INSTANCES在运行),并利用它来进行计算idealPartionionNo(如更换NO_OF_EXECUTOR_INSTANCESAVAILABLE_EXECUTOR_INSTANCES)?

理想情况下,表单的一些实际示例:

  • 这是一个数据集(大小);
  • 这是RDD/DF的一些转换和可能的重用.
  • 这是你应该重新分配/合并的地方.
  • 假设您有n 执行程序,其m 核心分区因子 …

scala apache-spark rdd

16
推荐指数
3
解决办法
5165
查看次数

如何从Spark中的文本文件创建DataFrame

我在HDFS上有一个文本文件,我想将它转换为Spark中的数据框.

我使用Spark Context加载文件,然后尝试从该文件生成单个列.

val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))
Run Code Online (Sandbox Code Playgroud)

执行此操作后,我正在尝试以下操作.

myFile1.toDF()
Run Code Online (Sandbox Code Playgroud)

我遇到了问题,因为myFile1 RDD中的元素现在是数组类型.

我该如何解决这个问题?

scala dataframe apache-spark rdd apache-spark-sql

15
推荐指数
3
解决办法
9万
查看次数

Spark:RDD到List

我有一个RDD结构

RDD[(String, String)]
Run Code Online (Sandbox Code Playgroud)

我想创建2个列表(rdd的每个维度一个).

我尝试使用rdd.foreach()并填充两个ListBuffers然后将它们转换为Lists,但我猜每个节点都创建自己的ListBuffer,因为在迭代之后BufferLists是空的.我该怎么做 ?

编辑:我的方法

val labeled = data_labeled.map { line =>
  val parts = line.split(',')
  (parts(5), parts(7))
}.cache()

var testList : ListBuffer[String] = new ListBuffer()

labeled.foreach(line =>
  testList += line._1
)
  val labeledList = testList.toList
  println("rdd: " + labeled.count)
  println("bufferList: " + testList.size)
  println("list: " + labeledList.size)
Run Code Online (Sandbox Code Playgroud)

结果是:

rdd: 31990654
bufferList: 0
list: 0
Run Code Online (Sandbox Code Playgroud)

scala list apache-spark rdd

15
推荐指数
1
解决办法
3万
查看次数