python struct.error:'i'格式需要-2147483648 <= number <= 2147483647

SUN*_*ONG 14 python struct multiprocessing python-3.x starmap

问题

我愿意使用多处理模块进行特征工程(multiprocessing.Pool.starmap().但是,它给出了如下错误消息.我想这错误信息是关于输入的大小(2147483647 = 2 ^ 31 - 1),由于在相同的代码工作顺利为一小部分(frac=0.05)输入dataframes(train_scala,测试,TS)的.我将数据帧的类型转换为尽可能小,但它并没有变得更好.

anaconda版本是4.3.30,Python版本是3.6(64位).并且系统的内存大小超过128GB,超过20个内核.您是否建议使用任何指针或解决方案来解决此问题?如果这个问题是由多处理模块的大数据引起的,那么我应该使用多少小数据来利用Python3上的多处理模块?

码:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)
Run Code Online (Sandbox Code Playgroud)

错误信息:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
Run Code Online (Sandbox Code Playgroud)

额外的信息

  • historyCutoffs是一个整数列表
  • train_scala是一只熊猫DataFrame(377MB)
  • 测试是熊猫DataFrame(15MB)
  • ts是一只熊猫DataFrame(547MB)
  • ul_parts_path是一个目录列表(字符串)
  • is_train_seq是一个布尔值列表

额外代码:方法multiprocess_FE

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
    train_dict = {}
    ts_dict = {}
    msno_dict = {}
    ul_dict = {}
    if is_train == True:
        train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
    else:
        train_dict[historyCutoff] = test
    msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
    print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
    ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
    print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
    ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
    ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
    train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)
Run Code Online (Sandbox Code Playgroud)

Mar*_*ers 14

进程之间的通信协议使用酸洗,并且pickle数据以pickle数据的大小为前缀.对于您的方法,所有参数一起被腌制为一个对象.

您生成了一个对象,当pickle大于适合istruct formatter(四字节有符号整数)时,它会破坏代码所做的假设.

您可以将数据帧的读取委派给子进程,而只是发送加载数据帧所需的元数据.它们的总体大小接近1GB,在您的进程之间通过管道共享太多数据.

引用编程指南部分:

比pickle/unpickle更好地继承

当使用spawnforkserver启动方法时,multiprocessing需要对许多类型进行可选择,以便子进程可以使用它们.但是,通常应避免使用管道或队列将共享对象发送到其他进程.相反,您应该安排程序,以便需要访问其他地方创建的共享资源的进程可以从祖先进程继承它.

如果您没有在Windows上运行并使用spawn或者forkserver方法,则可以开始子进程之前将数据帧加载为全局变量,此时子进程将通过正常的OS写时复制内存页共享"继承"数据机制.

  • @MartijnPieters很棒的答案,谢谢!不过只是一个评论 - 这不是非常令人沮丧吗?很老的心态.例如,如果通过网络将数据传递给子进程,我就会理解这个问题; 但要在本地超过50GB的RAM,共享总线等进程之间这样做. - 谁在乎.应该是可扩展的.为Pete发出警告.不要在struct.error上破坏. (9认同)
  • 为什么我的泡菜这么大 (2认同)

Ale*_*lex 8

这个问题在最近的PR中修复为python https://github.com/python/cpython/pull/10305

如果需要,您可以在本地进行此更改以使其立即为您工作,而无需等待python和anaconda发布。

  • 如果您想知道,这个[更改](https://github.com/python/cpython/pull/10305/files#diff-c5770948f4fd3ebd154ea6f013f59422R392)不在[3.7.5](https://github.com/python)中/cpython/blob/v3.7.5/Lib/multiprocessing/connection.py#L392),它位于[3.8.0](https://github.com/python/cpython/blob/v3.8.0/Lib/multiprocessing /connection.py#L392)。 (2认同)