Das*_*asi 6 python multiprocessing python-3.x python-multiprocessing
我开始将多重处理纳入我的代码中,因为我试图自动化的任务从计算角度来说相当昂贵。根据我收集的 stackoverflow 信息,我的代码中的模块结构如下。我正在 Win10 中开发 python 3.7。
\nmain:除了加载输入等之外,还调用多处理函数的地方。
\nimport pandas as pd\nimport run\n\ndef do():\n df=pd.DataFrame({'Identifier': ['id_1', 'id_1', 'id_1', 'id_1', 'id_1', 'id_2', 'id_2', 'id_2', 'id_2', 'id_2', 'id_3', 'id_3', 'id_3', 'id_3', 'id_3'],\n 'float_id': [1, 2, 3, 4, 5, 10, 25, 33, 45, 50, .1, .2, .3, .4, .5],\n 'a': np.random.rand(15),\n 'b': np.random.rand(15),\n 'c': np.random.rand(15)})\n \n v_column=['a', 'b', 'c']\n \n df_out=run.function_multiprocessing(df, v_column)\n\n return df_out\n\nif __name__=='__main__':\n df_out=do()\n
Run Code Online (Sandbox Code Playgroud)\nimport defs\nimport pandas as pd\n\nimport multiprocessing\n\n\ndef iterator(data, id_col, value_col):\n for col in value_col:\n yield (data[col].values, data[id_col].values)\n\ndef function_multiprocessing(data, v_column):\n list_df=[]\n with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:\n for identifier, df_f in data.groupby(['Identifier']): \n print(identifier)\n data_f=pool.starmap(defs.function_to_apply, iterator(df_f, 'float_id', v_column))\n \n out=pd.DataFrame(data_f, index=[identifier])\n list_df.append(out)\n \n df_out=pd.concat(list_df)\n \n return df_out\n
Run Code Online (Sandbox Code Playgroud)\n这些模块都不属于我的 PYHTONPATH,并且位于同一文件夹中。我想做的是根据“float_id”中找到的值以及分别“a”、“b”和“c”列的值,对每个“标识符”应用一个函数。为了简单起见,我们可以考虑基于“float_id”值的列值的加权平均值。
\n当我执行代码时,无论我尝试什么,每个工作人员都会一遍又一遍地收到以下错误。
\nProcess SpawnPoolWorker-1:\nTraceback (most recent call last):\n File "C:\\Users\\xxxx\\AppData\\Local\\Continuum\\anaconda3\\lib\\multiprocessing\\process.py", line 297, in _bootstrap\n self.run()\n File "C:\\Users\\xxxx\\AppData\\Local\\Continuum\\anaconda3\\lib\\multiprocessing\\process.py", line 99, in run\n self._target(*self._args, **self._kwargs)\n File "C:\\Users\\xxxx\\AppData\\Local\\Continuum\\anaconda3\\lib\\multiprocessing\\pool.py", line 110, in worker\n task = get()\n File "C:\\Users\\xxxx\\AppData\\Local\\Continuum\\anaconda3\\lib\\multiprocessing\\queues.py", line 354, in get\n return _ForkingPickler.loads(res)\nModuleNotFoundError: No module named 'defs'\n
Run Code Online (Sandbox Code Playgroud)\n我尝试multiprocessing.set_start_method("fork")
在导入多处理模块后包含该行,这引发了错误。对于 也一样"spawn"
。我还尝试将模块“defs”作为参数包含在function_multiprocessing
方法中,并从 调用它main
,但没有成功。提供“function_to_apply”作为参数时会发生相同的错误。
\xc2\xbf我做错了什么?\xc2\xbf我怎样才能完成这个工作?
\n预先非常感谢!
\n更新:在方法defs.py
中导入模块时function_multiprocessing
def function_multiprocessing(data, v_column):\n import defs\n \n list_df=[]\n ...\n
Run Code Online (Sandbox Code Playgroud)\n它不会引发任何错误。但是,当将模块提供为变量时,它不起作用。
\n小智 0
有类似的错误。解决方法是subprocess.run
直接调用map
(类似于starmap
,但在这里我们不能使用它,因为它会解压参数而subprocess.run
不是下面的“main_script”),并将可执行逻辑放在 python 文件中 - 而不是函数:
pool.map(subprocess.run, [sys.executable, main_script, *args])
Run Code Online (Sandbox Code Playgroud)
在哪里
if __name__ == '__main__':
将在其中执行。你可以把你的逻辑从function_to_apply
那里放出来。 归档时间: |
|
查看次数: |
2484 次 |
最近记录: |