工作线程是否有Pool类,类似于多处理模块的Pool类?
我喜欢例如并行化地图功能的简单方法
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
Run Code Online (Sandbox Code Playgroud)
但是我想在没有创建新流程的开销的情况下这样做.
我知道GIL.但是,在我的用例中,该函数将是一个IO绑定的C函数,python包装器将在实际函数调用之前释放GIL.
我是否必须编写自己的线程池?
如何获取foo从线程目标返回的值?
from threading import Thread
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()
Run Code Online (Sandbox Code Playgroud)
如上所示,"一种显而易见的方法"不起作用:'foo'返回'foo'.
我有一个模块A,它通过获取数据并将其发送到模块B,C,D等进行分析然后将它们的结果连接在一起来执行基本映射/缩减.
但似乎模块B,C,D等本身不能创建多处理池,否则我得到
AssertionError: daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)
是否有可能以其他方式并行化这些工作?
为清楚起见,这里是一个(通常是坏的)婴儿的例子.(我通常会尝试/捕捉,但你得到了要点.)
A.py:
import B
from multiprocessing import Pool
def main():
p = Pool()
results = p.map(B.foo,range(10))
p.close()
p.join()
return results
B.py:
from multiprocessing import Pool
def foo(x):
p = Pool()
results = p.map(str,x)
p.close()
p.join()
return results
Run Code Online (Sandbox Code Playgroud) 在Python(2.7)中,我尝试在芹菜任务(celery 3.1.17)中创建进程(使用多处理),但它给出了错误:
daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)
谷歌搜索它,我发现最新版本的台球修复了"错误",但我有最新版本(3.3.0.20),错误仍在发生.我也试图在我的芹菜任务中实现这个解决方法,但它给出了同样的错误.
有谁知道怎么做?任何帮助表示赞赏,帕特里克
编辑:代码片段
任务:
from __future__ import absolute_import
from celery import shared_task
from embedder.models import Embedder
@shared_task
def embedder_update_task(embedder_id):
embedder = Embedder.objects.get(pk=embedder_id)
embedder.test()
Run Code Online (Sandbox Code Playgroud)
人工测试功能(从这里):
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = mp.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, …Run Code Online (Sandbox Code Playgroud) 我有一个并行化的任务,从多个文件中读取内容,并将信息写入多个文件.
我目前用来并行化东西的成语:
listOfProcesses = []
for fileToBeRead in listOfFilesToBeRead:
process = multiprocessing.Process(target = somethingThatReadsFromAFileAndWritesSomeStuffOut, args = (fileToBeRead))
process.start()
listOfProcesses.append(process)
for process in listOfProcesses:
process.join()
Run Code Online (Sandbox Code Playgroud)
值得注意的是,somethingThatReadsFromAFileAndWritesSomeStuffOut它本身可以并行化任务(它可能必须从其他文件中读取等等).
现在,正如您所看到的,正在创建的进程数量不依赖于我在计算机上拥有的核心数量或其他任何内容,除了需要完成的任务数量.如果需要运行十个任务,请创建十个进程,依此类推.
这是创建任务的最佳方式吗?我应该考虑一下我的处理器有多少核心,等等?
我正在使用Python 2.7。
我目前正在使用ThreadPoolExecuter这样的:
params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
result = list(executor.map(f, params))
Run Code Online (Sandbox Code Playgroud)
问题是f有时运行时间过长。每当我运行时f,我都希望将其运行时间限制为100秒,然后终止运行。
最终,每个元素x在param,我想有与否的指示f必须被杀死,并且如果它不是-是什么返回值。即使f对于一个参数超时,我仍然希望使用下一个参数运行它。
该executer.map方法确实有一个timeout参数,但是它为整个运行(从调用时间到)设置了超时executer.map,而不是分别为每个线程设置超时。
什么是获得我想要的行为的最简单方法?
下面是我的 python 脚本。
import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time
from random import randint
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def sleepawhile(t):
print("Sleeping %i seconds..." …Run Code Online (Sandbox Code Playgroud) 我第一次尝试使用多处理。所以我想我会做一个非常简单的测试示例,它可以分解 100 个不同的数字。
from multiprocessing import Pool
from primefac import factorint
N = 10**30
L = range(N,N + 100)
pool = Pool()
pool.map(factorint, L)
Run Code Online (Sandbox Code Playgroud)
这给了我错误:
Traceback (most recent call last):
File "test.py", line 8, in <module>
pool.map(factorint, L)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
AssertionError: daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)
我看到Python 进程池非守护进程?讨论了这个问题,但我不明白为什么它与我的简单玩具示例相关。我究竟做错了什么?