rsc*_*c05 17 debugging multiprocessing python-3.x jupyter
我基本上使用的是多处理模块,我还在学习多处理的功能.我正在使用Dusty Phillips的书,这段代码属于它.
import multiprocessing
import random
from multiprocessing.pool import Pool
def prime_factor(value):
factors = []
for divisor in range(2, value-1):
quotient, remainder = divmod(value, divisor)
if not remainder:
factors.extend(prime_factor(divisor))
factors.extend(prime_factor(quotient))
break
else:
factors = [value]
return factors
if __name__ == '__main__':
pool = Pool()
to_factor = [ random.randint(100000, 50000000) for i in range(20)]
results = pool.map(prime_factor, to_factor)
for value, factors in zip(to_factor, results):
print("The factors of {} are {}".format(value, factors))
Run Code Online (Sandbox Code Playgroud)
在Windows PowerShell上(不在jupyter笔记本上),我看到以下内容
Process SpawnPoolWorker-5:
Process SpawnPoolWorker-1:
AttributeError: Can't get attribute 'prime_factor' on <module '__main__' (built-in)>
Run Code Online (Sandbox Code Playgroud)
我不知道为什么细胞永远不会结束?
rsc*_*c05 24
似乎Jupyter笔记本中的问题与不同的ide一样是设计特征.因此,我们必须将函数(prime_factor)写入不同的文件并导入模块.此外,我们必须处理调整.例如,在我的例子中,我将函数编码为一个名为defs.py的文件
def prime_factor(value):
factors = []
for divisor in range(2, value-1):
quotient, remainder = divmod(value, divisor)
if not remainder:
factors.extend(prime_factor(divisor))
factors.extend(prime_factor(quotient))
break
else:
factors = [value]
return factors
Run Code Online (Sandbox Code Playgroud)
然后在jupyter笔记本中我写了以下几行
import multiprocessing
import random
from multiprocessing import Pool
import defs
if __name__ == '__main__':
pool = Pool()
to_factor = [ random.randint(100000, 50000000) for i in range(20)]
results = pool.map(defs.prime_factor, to_factor)
for value, factors in zip(to_factor, results):
print("The factors of {} are {}".format(value, factors))
Run Code Online (Sandbox Code Playgroud)
这解决了我的问题
要执行函数而不必手动将其写入单独的文件:
我们可以将要处理的任务动态写入临时文件,导入并执行函数。
from multiprocessing import Pool
from functools import partial
import inspect
def parallal_task(func, iterable, *params):
with open(f'./tmp_func.py', 'w') as file:
file.write(inspect.getsource(func).replace(func.__name__, "task"))
from tmp_func import task
if __name__ == '__main__':
func = partial(task, params)
pool = Pool(processes=8)
res = pool.map(func, iterable)
pool.close()
return res
else:
raise "Not in Jupyter Notebook"
Run Code Online (Sandbox Code Playgroud)
然后我们可以简单地在笔记本单元中调用它,如下所示:
def long_running_task(params, id):
# Heavy job here
return params, id
data_list = range(8)
for res in parallal_task(long_running_task, data_list, "a", 1, "b"):
print(res)
Run Code Online (Sandbox Code Playgroud)
输出:
('a', 1, 'b') 0
('a', 1, 'b') 1
('a', 1, 'b') 2
('a', 1, 'b') 3
('a', 1, 'b') 4
('a', 1, 'b') 5
('a', 1, 'b') 6
('a', 1, 'b') 7
Run Code Online (Sandbox Code Playgroud)
注意:如果您使用的是 Anaconda 并且想查看繁重任务的进度,可以使用print()inside long_running_task(). 打印的内容将显示在 Anaconda Prompt 控制台中。
严格来说,Windows Jupyter Notebook 上甚至不支持 Python 多处理if __name__="__main__"。
Windows 10 中的一种解决方法是将 Windows 浏览器与 WSL 中的 Jupyter 服务器连接。
您可以获得与 Linux 相同的体验。
您可以手动设置或参考https://github.com/mszhanyi/gemini中的脚本