我正在使用大约100 MB腌制的广播变量,我接近:
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896
Run Code Online (Sandbox Code Playgroud)
在具有3个c3.2xlarge执行程序和m3.large驱动程序的集群上运行,并使用以下命令启动交互式会话:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
Run Code Online (Sandbox Code Playgroud)
在RDD中,如果我持久引用此广播变量,则内存使用量会爆炸.对于100 MB变量的100个引用,即使它被复制了100次,我预计数据使用总量不会超过10 GB(更不用说3个节点上的30 GB).但是,当我运行以下测试时,我看到内存不足错误:
data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))
Run Code Online (Sandbox Code Playgroud)
堆栈跟踪:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", …Run Code Online (Sandbox Code Playgroud) 我的luigi.cfg文件中有以下行(在所有节点、调度程序和工作程序上):
[core]
parallel-scheduling: true
Run Code Online (Sandbox Code Playgroud)
然而,当我在我的 luigi 调度程序上监控 CPU 利用率时(有大约 4000 个任务的图表,处理来自大约 100 个工作人员的请求),它只使用调度程序上的单个内核,luigid单线程经常达到 100% CPU 利用率. 我的理解是这个配置变量应该并行化任务的调度。
消息来源表明该标志确实应该在调度程序上使用多个内核。在https://github.com/spotify/luigi/blob/master/luigi/interface.py#L194 中,调用https://github.com/spotify/luigi/blob/master/luigi/worker。 py#L498.complete()并行检查任务的状态。
让我的 Luigi 调度程序利用其所有核心我还缺少什么?