ale*_*kov 20 python iteration loops apache-spark pyspark
我是Spark的新手,我正在尝试用马尔科夫模型表示的质心实现一些迭代算法进行聚类(期望最大化).所以我需要进行迭代和连接.
我遇到的一个问题是每次迭代时间呈指数增长.
经过一些实验后,我发现在进行迭代时需要保留将在下一次迭代中重用的RDD,否则每个迭代spark都会创建执行计划,从开始重新计算RDD,从而增加计算时间.
init = sc.parallelize(xrange(10000000), 3)
init.cache()
for i in range(6):
print i
start = datetime.datetime.now()
init2 = init.map(lambda n: (n, n*3))
init = init2.map(lambda n: n[0])
# init.cache()
print init.count()
print str(datetime.datetime.now() - start)
Run Code Online (Sandbox Code Playgroud)
结果是:
0
10000000
0:00:04.283652
1
10000000
0:00:05.998830
2
10000000
0:00:08.771984
3
10000000
0:00:11.399581
4
10000000
0:00:14.206069
5
10000000
0:00:16.856993
Run Code Online (Sandbox Code Playgroud)
因此添加cache()会有所帮助,迭代时间也会变得不变.
init = sc.parallelize(xrange(10000000), 3)
init.cache()
for i in range(6):
print i
start = datetime.datetime.now()
init2 = init.map(lambda n: (n, n*3))
init = init2.map(lambda n: n[0])
init.cache()
print init.count()
print str(datetime.datetime.now() - start)
0
10000000
0:00:04.966835
1
10000000
0:00:04.609885
2
10000000
0:00:04.324358
3
10000000
0:00:04.248709
4
10000000
0:00:04.218724
5
10000000
0:00:04.223368
Run Code Online (Sandbox Code Playgroud)
但是当在迭代内部进行Join时,问题又回来了.这是一些简单的代码,我演示了这个问题.即使在每个RDD转换上进行缓存也无法解决问题:
init = sc.parallelize(xrange(10000), 3)
init.cache()
for i in range(6):
print i
start = datetime.datetime.now()
init2 = init.map(lambda n: (n, n*3))
init2.cache()
init3 = init.map(lambda n: (n, n*2))
init3.cache()
init4 = init2.join(init3)
init4.count()
init4.cache()
init = init4.map(lambda n: n[0])
init.cache()
print init.count()
print str(datetime.datetime.now() - start)
Run Code Online (Sandbox Code Playgroud)
这是输出.正如您所看到的迭代时间呈指数增长:(
0
10000
0:00:00.674115
1
10000
0:00:00.833377
2
10000
0:00:01.525314
3
10000
0:00:04.194715
4
10000
0:00:08.139040
5
10000
0:00:17.852815
Run Code Online (Sandbox Code Playgroud)
我真的很感激任何帮助:)
zer*_*323 27
摘要:
一般而言,迭代算法,尤其是具有自连接或自联合的算法,需要对以下内容进行控制:
这里描述的问题是缺乏前者的结果.在每次迭代中,分区数随着自连接而增加,从而导致指数模式.要解决这个问题,您必须控制每次迭代中的分区数量(见下文)或使用全局工具spark.default.parallelism
(请参阅Travis提供的答案).通常,第一种方法通常提供更多控制,并且不影响代码的其他部分.
原始答案:
据我所知,这里有两个交错的问题 - 在连接期间增加分区数量和重排开销.两者都可以轻松处理,所以让我们一步一步走.
首先让我们创建一个帮助器来收集统计信息:
import datetime
def get_stats(i, init, init2, init3, init4,
start, end, desc, cache, part, hashp):
return {
"i": i,
"init": init.getNumPartitions(),
"init1": init2.getNumPartitions(),
"init2": init3.getNumPartitions(),
"init4": init4.getNumPartitions(),
"time": str(end - start),
"timen": (end - start).seconds + (end - start).microseconds * 10 **-6,
"desc": desc,
"cache": cache,
"part": part,
"hashp": hashp
}
Run Code Online (Sandbox Code Playgroud)
处理缓存/分区的另一个帮助器
def procRDD(rdd, cache=True, part=False, hashp=False, npart=16):
rdd = rdd if not part else rdd.repartition(npart)
rdd = rdd if not hashp else rdd.partitionBy(npart)
return rdd if not cache else rdd.cache()
Run Code Online (Sandbox Code Playgroud)
提取管道逻辑:
def run(init, description, cache=True, part=False, hashp=False,
npart=16, n=6):
times = []
for i in range(n):
start = datetime.datetime.now()
init2 = procRDD(
init.map(lambda n: (n, n*3)),
cache, part, hashp, npart)
init3 = procRDD(
init.map(lambda n: (n, n*2)),
cache, part, hashp, npart)
# If part set to True limit number of the output partitions
init4 = init2.join(init3, npart) if part else init2.join(init3)
init = init4.map(lambda n: n[0])
if cache:
init4.cache()
init.cache()
init.count() # Force computations to get time
end = datetime.datetime.now()
times.append(get_stats(
i, init, init2, init3, init4,
start, end, description,
cache, part, hashp
))
return times
Run Code Online (Sandbox Code Playgroud)
并创建初始数据:
ncores = 8
init = sc.parallelize(xrange(10000), ncores * 2).cache()
Run Code Online (Sandbox Code Playgroud)
numPartitions
单独加入操作,如果未提供参数,则根据输入RDD的分区数调整输出中的分区数.这意味着每次迭代都会增加分区数量.如果分区的数量是大的事情变得丑陋.您可以通过为numPartitions
每次迭代提供连接或重新分区RDD的参数来处理这些问题.
timesCachePart = sqlContext.createDataFrame(
run(init, "cache + partition", True, True, False, ncores * 2))
timesCachePart.select("i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+-----------------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+-----------------+
|0| 16| 16| 16|0:00:01.145625|cache + partition|
|1| 16| 16| 16|0:00:01.090468|cache + partition|
|2| 16| 16| 16|0:00:01.059316|cache + partition|
|3| 16| 16| 16|0:00:01.029544|cache + partition|
|4| 16| 16| 16|0:00:01.033493|cache + partition|
|5| 16| 16| 16|0:00:01.007598|cache + partition|
+-+-----+-----+-----+--------------+-----------------+
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,当我们重新分区时,执行时间或多或少是恒定的.第二个问题是上面的数据是随机分区的.为了确保连接性能,我们希望在单个分区上具有相同的密钥.要实现这一点,我们可以使用散列分区器:
timesCacheHashPart = sqlContext.createDataFrame(
run(init, "cache + hashpart", True, True, True, ncores * 2))
timesCacheHashPart.select("i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+----------------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+----------------+
|0| 16| 16| 16|0:00:00.946379|cache + hashpart|
|1| 16| 16| 16|0:00:00.966519|cache + hashpart|
|2| 16| 16| 16|0:00:00.945501|cache + hashpart|
|3| 16| 16| 16|0:00:00.986777|cache + hashpart|
|4| 16| 16| 16|0:00:00.960989|cache + hashpart|
|5| 16| 16| 16|0:00:01.026648|cache + hashpart|
+-+-----+-----+-----+--------------+----------------+
Run Code Online (Sandbox Code Playgroud)
执行时间和以前一样是恒定的,与基本分区相比有一点改进.
现在让我们只使用缓存作为参考:
timesCacheOnly = sqlContext.createDataFrame(
run(init, "cache-only", True, False, False, ncores * 2))
timesCacheOnly.select("i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+----------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+----------+
|0| 16| 16| 32|0:00:00.992865|cache-only|
|1| 32| 32| 64|0:00:01.766940|cache-only|
|2| 64| 64| 128|0:00:03.675924|cache-only|
|3| 128| 128| 256|0:00:06.477492|cache-only|
|4| 256| 256| 512|0:00:11.929242|cache-only|
|5| 512| 512| 1024|0:00:23.284508|cache-only|
+-+-----+-----+-----+--------------+----------+
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,仅缓存版本的分区数(init2,init3,init4)在每次迭代时都会翻倍,执行时间与分区数成正比.
最后,如果我们使用散列分区器,我们可以检查是否可以提高大量分区的性能:
timesCacheHashPart512 = sqlContext.createDataFrame(
run(init, "cache + hashpart 512", True, True, True, 512))
timesCacheHashPart512.select(
"i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+--------------------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+--------------------+
|0| 512| 512| 512|0:00:14.492690|cache + hashpart 512|
|1| 512| 512| 512|0:00:20.215408|cache + hashpart 512|
|2| 512| 512| 512|0:00:20.408070|cache + hashpart 512|
|3| 512| 512| 512|0:00:20.390267|cache + hashpart 512|
|4| 512| 512| 512|0:00:20.362354|cache + hashpart 512|
|5| 512| 512| 512|0:00:19.878525|cache + hashpart 512|
+-+-----+-----+-----+--------------+--------------------+
Run Code Online (Sandbox Code Playgroud)
改进并不是那么令人印象深刻,但如果你有一个小集群和大量数据,它仍然值得尝试.
我想这里带走的消息是分区问题.有一些上下文可以为你处理(mllib
,sql
)但是如果你使用低级操作则由你负责.
问题是(正如在他的全面回答中指出的那样,零323),在没有指定分区数量的情况下调用join可能会(确实)导致越来越多的分区.分区数量(显然)可以不受限制地增长.在重复调用join时,有(至少)两种方法可以防止分区数量的增长(没有绑定).
方法1:
正如zero323指出的那样,您可以在调用join时手动指定分区数.例如
rdd1.join(rdd2, numPartitions)
Run Code Online (Sandbox Code Playgroud)
这将确保分区数不超过numPartitions,特别是分区数不会不断增长.
方法2:
创建SparkConf时,可以指定默认的并行级别.如果设置了此值,那么当您调用函数join
而不指定numPartitions时,将使用默认并行性,从而有效地限制分区数量并防止它们增长.您可以将此参数设置为
conf=SparkConf.set("spark.default.parallelism", numPartitions)
sc = SparkContex(conf=conf)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6020 次 |
最近记录: |