从另一个模块调用时,模块未找到多处理错误

Das*_*asi 6 python multiprocessing python-3.x python-multiprocessing

我开始将多重处理纳入我的代码中,因为我试图自动化的任务从计算角度来说相当昂贵。根据我收集的 stackoverflow 信息,我的代码中的模块结构如下。我正在 Win10 中开发 python 3.7。

\n

main:除了加载输入等之外,还调用多处理函数的地方。

\n
import pandas as pd\nimport run\n\ndef do():\n    df=pd.DataFrame({'Identifier': ['id_1', 'id_1', 'id_1', 'id_1', 'id_1', 'id_2', 'id_2', 'id_2', 'id_2', 'id_2', 'id_3', 'id_3', 'id_3', 'id_3', 'id_3'],\n                 'float_id': [1, 2, 3, 4, 5, 10, 25, 33, 45, 50, .1, .2, .3, .4, .5],\n                 'a': np.random.rand(15),\n                 'b': np.random.rand(15),\n                 'c': np.random.rand(15)})\n    \n    v_column=['a', 'b', 'c']\n    \n    df_out=run.function_multiprocessing(df, v_column)\n\n    return df_out\n\nif __name__=='__main__':\n    df_out=do()\n
Run Code Online (Sandbox Code Playgroud)\n
    \n
  • run:多处理函数所在的位置
  • \n
\n
import defs\nimport pandas as pd\n\nimport multiprocessing\n\n\ndef iterator(data, id_col, value_col):\n    for col in value_col:\n        yield (data[col].values, data[id_col].values)\n\ndef function_multiprocessing(data, v_column):\n    list_df=[]\n    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:\n        for identifier, df_f in data.groupby(['Identifier']): \n            print(identifier)\n            data_f=pool.starmap(defs.function_to_apply, iterator(df_f, 'float_id', v_column))\n            \n            out=pd.DataFrame(data_f, index=[identifier])\n            list_df.append(out)\n    \n    df_out=pd.concat(list_df)\n    \n    return df_out\n
Run Code Online (Sandbox Code Playgroud)\n
    \n
  • defs:“多进程”函数(function_to_apply)所在的位置
  • \n
\n

这些模块都不属于我的 PYHTONPATH,并且位于同一文件夹中。我想做的是根据“float_id”中找到的值以及分别“a”、“b”和“c”列的值,对每个“标识符”应用一个函数。为了简单起见,我们可以考虑基于“float_id”值的列值的加权平均值。

\n

当我执行代码时,无论我尝试什么,每个工作人员都会一遍又一遍地收到以下错误。

\n
Process SpawnPoolWorker-1:\nTraceback (most recent call last):\n  File "C:\\Users\\xxxx\\AppData\\Local\\Continuum\\anaconda3\\lib\\multiprocessing\\process.py", line 297, in _bootstrap\n    self.run()\n  File "C:\\Users\\xxxx\\AppData\\Local\\Continuum\\anaconda3\\lib\\multiprocessing\\process.py", line 99, in run\n    self._target(*self._args, **self._kwargs)\n  File "C:\\Users\\xxxx\\AppData\\Local\\Continuum\\anaconda3\\lib\\multiprocessing\\pool.py", line 110, in worker\n    task = get()\n  File "C:\\Users\\xxxx\\AppData\\Local\\Continuum\\anaconda3\\lib\\multiprocessing\\queues.py", line 354, in get\n    return _ForkingPickler.loads(res)\nModuleNotFoundError: No module named 'defs'\n
Run Code Online (Sandbox Code Playgroud)\n

我尝试multiprocessing.set_start_method("fork")在导入多处理模块后包含该行,这引发了错误。对于 也一样"spawn"。我还尝试将模块“defs”作为参数包含在function_multiprocessing方法中,并从 调用它main,但没有成功。提供“function_to_apply”作为参数时会发生相同的错误。

\n

\xc2\xbf我做错了什么?\xc2\xbf我怎样才能完成这个工作?

\n

预先非常感谢!

\n

更新:在方法defs.py中导入模块时function_multiprocessing

\n
def function_multiprocessing(data, v_column):\n    import defs\n    \n    list_df=[]\n    ...\n
Run Code Online (Sandbox Code Playgroud)\n

它不会引发任何错误。但是,当将模块提供为变量时,它不起作用。

\n

小智 0

有类似的错误。解决方法是subprocess.run直接调用map(类似于starmap,但在这里我们不能使用它,因为它会解压参数而subprocess.run不是下面的“main_script”),并将可执行逻辑放在 python 文件中 - 而不是函数:

pool.map(subprocess.run, [sys.executable, main_script, *args])
Run Code Online (Sandbox Code Playgroud)

在哪里

  • sys.executable 为您提供 python 可执行文件
  • main_script 是一个字符串,其中包含 python 文件的完整路径,其中的代码 if __name__ == '__main__': 将在其中执行。你可以把你的逻辑从function_to_apply那里放出来。
  • args:可选给出的参数,您可以将它们解析为该脚本的命令行参数。如果您不需要此项,则可以从上面的列表中省略此项。