标签: rdd

当Spark意识到它不再被使用时,Spark会不会自己解决它?

当我们想要多次使用它时,我们可以将RDD持久存储到内存和/或磁盘中.但是,我们以后必须自己解除它们,或者Spark是否会进行某种垃圾收集并在不再需要RDD时解除它的作用?我注意到如果我自己调用unpersist函数,我的性能会变慢.

hadoop distributed-computing bigdata apache-spark rdd

29
推荐指数
1
解决办法
6332
查看次数

Spark镶木地板分区:大量文件

我正在尝试利用spark分区.我试图做类似的事情

data.write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)

这里的问题每个分区都会产生大量的镶木地板文件,如果我尝试从根目录中读取,会导致读取速度慢.

为了避免我试过

data.coalese(numPart).write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)

但是,这会在每个分区中创建numPart数量的镶木地板文件.现在我的分区大小不同了.所以我理想的是希望每个分区有单独的合并.然而,这看起来并不容易.我需要访问所有分区合并到一定数量并存储在一个单独的位置.

写入后我应该如何使用分区来避免许多文件?

bigdata apache-spark rdd spark-dataframe apache-spark-2.0

29
推荐指数
4
解决办法
3万
查看次数

使用Python计算Spark中的Pairwise(K,V)RDD中每个KEY的平均值

我想与Python解决方案共享这个特定的Apache Spark,因为它的文档很差.

我想用KEY计算K/V对(存储在Pairwise RDD中)的平均值.以下是示例数据的样子:

>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]
Run Code Online (Sandbox Code Playgroud)

现在,下面的代码序列不是最佳的方法,但它确实有效.在我找到更好的解决方案之前,我正在做的事情.这并不可怕但是 - 正如你在答案部分看到的那样 - 有一种更简洁,有效的方式.

>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by …
Run Code Online (Sandbox Code Playgroud)

python average aggregate apache-spark rdd

28
推荐指数
3
解决办法
3万
查看次数

Spark RDD - 使用额外参数进行映射

是否可以将额外的参数传递给pySpark中的映射函数?具体来说,我有以下代码配方:

raw_data_rdd = sc.textFile("data.json", use_unicode=True)
json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line))
mapped_rdd = json_data_rdd.flatMap(processDataLine)
Run Code Online (Sandbox Code Playgroud)

processDataLine除了JSON对象之外,该函数还需要额外的参数,如下所示:

def processDataLine(dataline, arg1, arg2)
Run Code Online (Sandbox Code Playgroud)

如何传递额外的参数arg1,并arg2flaMap功能?

python apache-spark rdd pyspark

28
推荐指数
1
解决办法
2万
查看次数

如何找到火花RDD /数据帧大小?

我知道如何在scala中找到文件大小.但是如何在spark中找到RDD/dataframe大小?

斯卡拉:

object Main extends App {
  val file = new java.io.File("hdfs://localhost:9000/samplefile.txt").toString()
  println(file.length)
}
Run Code Online (Sandbox Code Playgroud)

火花:

val distFile = sc.textFile(file)
println(distFile.length)
Run Code Online (Sandbox Code Playgroud)

但如果我处理它没有获得文件大小.如何找到RDD大小?

scala apache-spark rdd

28
推荐指数
3
解决办法
6万
查看次数

如何在Spark RDD(Java)中通过索引获取元素

我知道方法rdd.first(),它给了我RDD中的第一个元素.

还有方法rdd.take(num)这给了我第一个"num"元素.

但是没有可能通过索引获得元素吗?

谢谢.

java apache-spark rdd

27
推荐指数
1
解决办法
5万
查看次数

Spark:测试RDD是否为空的有效方法

isEmptyRDD上没有方法,那么如果RDD为空,最有效的测试方法是什么?

scala apache-spark rdd

26
推荐指数
1
解决办法
2万
查看次数

如何获得Spark RDD的SQL row_number等价物?

我需要为包含许多列的数据表生成row_numbers的完整列表.

在SQL中,这将如下所示:

select
   key_value,
   col1,
   col2,
   col3,
   row_number() over (partition by key_value order by col1, col2 desc, col3)
from
   temp
;
Run Code Online (Sandbox Code Playgroud)

现在,让我们说在Spark中我有一个形式为(K,V)的RDD,其中V =(col1,col2,col3),所以我的条目就像

(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
Run Code Online (Sandbox Code Playgroud)

我想使用sortBy(),sortWith(),sortByKey(),zipWithIndex等命令对它们进行排序,并使用正确的row_number创建一个新的RDD.

(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
Run Code Online (Sandbox Code Playgroud)

(我不关心括号,所以表格也可以是(K,(col1,col2,col3,rownum))而不是)

我该怎么做呢?

这是我的第一次尝试:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3) …
Run Code Online (Sandbox Code Playgroud)

sql row-number apache-spark rdd

25
推荐指数
2
解决办法
3万
查看次数

使用/不使用Spark SQL加入两个普通RDD

我需要RDDs在一个/多个列上加入两个普通的列.逻辑上,此操作等效于两个表的数据库连接操作.我想知道这是否只有通过Spark SQL或其他方式可行.

作为一个具体示例,请考虑r1使用主键的RDD ITEM_ID:

(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID)
Run Code Online (Sandbox Code Playgroud)

r2主键的RDD COMPANY_ID:

(COMPANY_ID, COMPANY_NAME, COMPANY_CITY)
Run Code Online (Sandbox Code Playgroud)

我想加入r1r2.

如何才能做到这一点?

scala join apache-spark rdd apache-spark-sql

25
推荐指数
3
解决办法
7万
查看次数

如何在Spark RDD中选择一系列元素?

我想在Spark RDD中选择一系列元素.例如,我有一个带有一百个元素的RDD,我需要选择60到80之间的元素.我该怎么做?

我看到RDD有一个take(i:int)方法,它返回第一个i元素.但是没有相应的方法来获取最后的i元素,或者从某个索引开始的中间元素.

apache-spark rdd

24
推荐指数
3
解决办法
4万
查看次数