Pab*_*blo 5 python pickle pandas python-multiprocessing
我们正在使用multiprocessing库(python 3.6)运行脚本,其中将big pd.DataFrames作为参数传递给函数:
from multiprocessing import Pool
import time
def my_function(big_df):
# do something time consuming
time.sleep(50)
if __name__ == '__main__':
with Pool(10) as p:
res = {}
output = {}
for id, big_df in some_dict_of_big_dfs:
res[id] = p.apply_async(my_function,(big_df ,))
output = {u : res[id].get() for id in id_list}
Run Code Online (Sandbox Code Playgroud)
问题是我们从pickle库中得到一个错误。
原因:“ OverflowError('无法序列化大于4GiB的字节对象',)”
我们知道pickle v4可以序列化更大的对象相关的问题,链接,但是我们不知道如何修改所multiprocessing使用的协议。
有人知道该怎么办吗?谢谢 !!
Pab*_*blo 14
显然,关于这个主题有一个未解决的问题,并且在这个特定的答案中描述了一些相关的举措。我找到了一种方法来更改基于此答案pickle的multiprocessing库中使用的默认协议。正如评论中指出的,这个解决方案只适用于 Linux 和 OS 多处理库
解决方案:
您首先创建一个新的分离模块
pickle4reducer.py
from multiprocessing.reduction import ForkingPickler, AbstractReducer
class ForkingPickler4(ForkingPickler):
def __init__(self, *args):
if len(args) > 1:
args[1] = 2
else:
args.append(2)
super().__init__(*args)
@classmethod
def dumps(cls, obj, protocol=4):
return ForkingPickler.dumps(obj, protocol)
def dump(obj, file, protocol=4):
ForkingPickler4(file, protocol).dump(obj)
class Pickle4Reducer(AbstractReducer):
ForkingPickler = ForkingPickler4
register = ForkingPickler4.register
dump = dump
Run Code Online (Sandbox Code Playgroud)
然后,在您的主脚本中,您需要添加以下内容:
import pickle4reducer
import multiprocessing as mp
ctx = mp.get_context()
ctx.reducer = pickle4reducer.Pickle4Reducer()
with mp.Pool(4) as p:
# do something
Run Code Online (Sandbox Code Playgroud)
这可能会解决溢出的问题。
但是,警告,您可能会考虑在做任何事情之前阅读本文,否则您可能会遇到与我相同的错误:
'i' 格式需要 -2147483648 <= number <= 2147483647
(这个错误的原因在上面的链接中有很好的解释)。长话短说,multiprocessing使用pickle协议通过其所有过程发送数据,如果您已经达到4gb限制,这可能意味着您可能会考虑将您的函数重新定义为“无效”方法而不是输入/输出方法。所有这些入站/出站数据都会增加 RAM 使用量,可能是构造效率低下(我的情况),最好将所有进程指向同一个对象,而不是为每个调用创建一个新副本。
希望这可以帮助。
| 归档时间: |
|
| 查看次数: |
3640 次 |
| 最近记录: |