当我们想要多次使用它时,我们可以将RDD持久存储到内存和/或磁盘中.但是,我们以后必须自己解除它们,或者Spark是否会进行某种垃圾收集并在不再需要RDD时解除它的作用?我注意到如果我自己调用unpersist函数,我的性能会变慢.
我正在尝试利用spark分区.我试图做类似的事情
data.write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
这里的问题每个分区都会产生大量的镶木地板文件,如果我尝试从根目录中读取,会导致读取速度慢.
为了避免我试过
data.coalese(numPart).write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
但是,这会在每个分区中创建numPart数量的镶木地板文件.现在我的分区大小不同了.所以我理想的是希望每个分区有单独的合并.然而,这看起来并不容易.我需要访问所有分区合并到一定数量并存储在一个单独的位置.
写入后我应该如何使用分区来避免许多文件?
我想与Python解决方案共享这个特定的Apache Spark,因为它的文档很差.
我想用KEY计算K/V对(存储在Pairwise RDD中)的平均值.以下是示例数据的样子:
>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]
Run Code Online (Sandbox Code Playgroud)
现在,下面的代码序列不是最佳的方法,但它确实有效.在我找到更好的解决方案之前,我正在做的事情.这并不可怕但是 - 正如你在答案部分看到的那样 - 有一种更简洁,有效的方式.
>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by …Run Code Online (Sandbox Code Playgroud) 是否可以将额外的参数传递给pySpark中的映射函数?具体来说,我有以下代码配方:
raw_data_rdd = sc.textFile("data.json", use_unicode=True)
json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line))
mapped_rdd = json_data_rdd.flatMap(processDataLine)
Run Code Online (Sandbox Code Playgroud)
processDataLine除了JSON对象之外,该函数还需要额外的参数,如下所示:
def processDataLine(dataline, arg1, arg2)
Run Code Online (Sandbox Code Playgroud)
如何传递额外的参数arg1,并arg2在flaMap功能?
我知道如何在scala中找到文件大小.但是如何在spark中找到RDD/dataframe大小?
斯卡拉:
object Main extends App {
val file = new java.io.File("hdfs://localhost:9000/samplefile.txt").toString()
println(file.length)
}
Run Code Online (Sandbox Code Playgroud)
火花:
val distFile = sc.textFile(file)
println(distFile.length)
Run Code Online (Sandbox Code Playgroud)
但如果我处理它没有获得文件大小.如何找到RDD大小?
我知道方法rdd.first(),它给了我RDD中的第一个元素.
还有方法rdd.take(num)这给了我第一个"num"元素.
但是没有可能通过索引获得元素吗?
谢谢.
isEmptyRDD上没有方法,那么如果RDD为空,最有效的测试方法是什么?
我需要为包含许多列的数据表生成row_numbers的完整列表.
在SQL中,这将如下所示:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Run Code Online (Sandbox Code Playgroud)
现在,让我们说在Spark中我有一个形式为(K,V)的RDD,其中V =(col1,col2,col3),所以我的条目就像
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
Run Code Online (Sandbox Code Playgroud)
我想使用sortBy(),sortWith(),sortByKey(),zipWithIndex等命令对它们进行排序,并使用正确的row_number创建一个新的RDD.
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
Run Code Online (Sandbox Code Playgroud)
(我不关心括号,所以表格也可以是(K,(col1,col2,col3,rownum))而不是)
我该怎么做呢?
这是我的第一次尝试:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3) …Run Code Online (Sandbox Code Playgroud) 我需要RDDs在一个/多个列上加入两个普通的列.逻辑上,此操作等效于两个表的数据库连接操作.我想知道这是否只有通过Spark SQL或其他方式可行.
作为一个具体示例,请考虑r1使用主键的RDD ITEM_ID:
(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID)
Run Code Online (Sandbox Code Playgroud)
和r2主键的RDD COMPANY_ID:
(COMPANY_ID, COMPANY_NAME, COMPANY_CITY)
Run Code Online (Sandbox Code Playgroud)
我想加入r1和r2.
如何才能做到这一点?
我想在Spark RDD中选择一系列元素.例如,我有一个带有一百个元素的RDD,我需要选择60到80之间的元素.我该怎么做?
我看到RDD有一个take(i:int)方法,它返回第一个i元素.但是没有相应的方法来获取最后的i元素,或者从某个索引开始的中间元素.