Rah*_*hul 7 python apache-spark pyspark
我有一堆元组,它们是复合键和值的形式.例如,
tfile.collect() = [(('id1','pd1','t1'),5.0),
(('id2','pd2','t2'),6.0),
(('id1','pd1','t2'),7.5),
(('id1','pd1','t3'),8.1) ]
Run Code Online (Sandbox Code Playgroud)
我想在这个集合上执行类似sql的操作,我可以根据id [1..n]或pd [1..n]聚合信息.我想使用vanilla pyspark apis实现而不使用SQLContext.在我目前的实现中,我正在阅读一堆文件并合并RDD.
def readfile():
fr = range(6,23)
tfile = sc.union([sc.textFile(basepath+str(f)+".txt")
.map(lambda view: set_feature(view,f))
.reduceByKey(lambda a, b: a+b)
for f in fr])
return tfile
Run Code Online (Sandbox Code Playgroud)
我打算创建一个聚合数组作为值.例如,
agg_tfile = [((id1,pd1),[5.0,7.5,8.1])]
Run Code Online (Sandbox Code Playgroud)
其中5.0,7.5,8.1代表[t1,t2,t3].我目前,使用字典通过vanilla python代码实现相同的目标.它适用于较小的数据集.但我担心,因为这可能无法扩展到更大的数据集.有没有一种使用pyspark apis实现相同目的的有效方法?
dap*_*mao 13
我的猜测是你想根据多个字段转置数据.
一种简单的方法是连接您将分组的目标字段,并使其成为配对RDD中的密钥.例如:
lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1'])
rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b)
print rdd.collect()
Run Code Online (Sandbox Code Playgroud)
然后你将获得转置结果.
[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')]
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
17005 次 |
最近记录: |