我读到Kryo序列化程序在Apache Spark中使用时可以提供更快的序列化.但是,我通过Python使用Spark.
切换到Kryo序列化器后,我仍然可以获得显着的好处吗?
我正在使用大约100 MB腌制的广播变量,我接近:
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896
Run Code Online (Sandbox Code Playgroud)
在具有3个c3.2xlarge执行程序和m3.large驱动程序的集群上运行,并使用以下命令启动交互式会话:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
Run Code Online (Sandbox Code Playgroud)
在RDD中,如果我持久引用此广播变量,则内存使用量会爆炸.对于100 MB变量的100个引用,即使它被复制了100次,我预计数据使用总量不会超过10 GB(更不用说3个节点上的30 GB).但是,当我运行以下测试时,我看到内存不足错误:
data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))
Run Code Online (Sandbox Code Playgroud)
堆栈跟踪:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", …Run Code Online (Sandbox Code Playgroud)