我正在使用这个答案,以便在Linux机器上运行Python中的多处理并行命令.
我的代码做了类似的事情:
import multiprocessing
import logging
def cycle(offset):
# Do stuff
def run():
for nprocess in process_per_cycle:
logger.info("Start cycle with %d processes", nprocess)
offsets = list(range(nprocess))
pool = multiprocessing.Pool(nprocess)
pool.map(cycle, offsets)
Run Code Online (Sandbox Code Playgroud)
但是我得到了这个错误:OSError: [Errno 24] Too many open files
所以,代码打开了太多文件描述符,即:它启动了太多进程而没有终止它们.
我修了它用这些线代替最后两行:
with multiprocessing.Pool(nprocess) as pool:
pool.map(cycle, offsets)
Run Code Online (Sandbox Code Playgroud)
但我不知道为什么这些线路修复了它.
在那下面发生了with
什么?
所以,我知道标题中两种方法的区别,但不知道实际含义。
据我了解:如果您使用的 NUM_WORKERS 数量多于实际可用的核心数量,您将面临性能大幅下降,因为您的操作系统不断地来回切换,试图保持并行。不知道这有多真实,但我在某处从比我聪明的人那里读到了它。
在它的文档中os.cpu_count()
说:
返回系统中 CPU 的数量。如果未确定则返回 None。该数字不等于当前进程可以使用的 CPU 数量。可用CPU的数量可以通过len(os.sched_getaffinity(0))获得
因此,我试图弄清楚“系统”指的是一个进程可使用的 CPU 数量是否多于“系统”中的 CPU 数量。
我只是想安全有效地实现multiprocessing.pool
功能。所以这是我的问题总结:
以下内容有何实际意义:
NUM_WORKERS = os.cpu_count() - 1
# vs.
NUM_WORKERS = len(os.sched_getaffinity(0)) - 1
Run Code Online (Sandbox Code Playgroud)
这-1
是因为我发现,如果我尝试在处理数据时工作,我的系统的延迟会少很多。
python parallel-processing multiprocessing python-multiprocessing process-pool
语境:
concurrent.futures.process.ProcessPool
来执行代码的Python 应用服务器(是的,我知道importlib.reload
有警告)
为了让它工作,我想我必须在进程池管理的importlib.reload
每个multiprocessing
进程中执行。
有没有办法向进程池中的所有进程提交一些东西?
python parallel-processing multiprocessing python-multiprocessing process-pool
我想监控不同进程的多个工作人员的进度。对于每个子进程,我都有自己的进度条,但它不能与ProcessPoolExecutor
执行程序正常工作。
def main():
with futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:
fut_to_num = {}
for i in range(PROCESSES):
fut = executor.submit(execute_many_threads, i)
fut_to_num[fut] = i
for future in futures.as_completed(fut_to_num):
r = future.result()
# print('{} returned {}'.format(fut_to_num[future], r))
print('\nDone!\n')
def execute_many_threads(n_pool=0):
with futures.ThreadPoolExecutor(max_workers=THREADS) as executor:
for i in range(THREADS):
executor.submit(execute_thread, n_pool, i)
return n_pool+1
def execute_thread(n_pool=0, n_thread=0):
s = random.randint(1, 5)
thread_num = n_pool*(PROCESSES-1) + n_thread
progress = tqdm.tqdm(
desc='#{:02d}'.format(thread_num),
position=thread_num,
total=10*s,
leave=False,
)
# print('Executing {}: {}...'.format(thread_num, s))
for i in range(s): …
Run Code Online (Sandbox Code Playgroud) python concurrent.futures python-multiprocessing tqdm process-pool
我正在做一些并行处理,如下所示:
with mp.Pool(8) as tmpPool:
results = tmpPool.starmap(my_function, inputs)
Run Code Online (Sandbox Code Playgroud)
输入看起来像:[(1,0.2312),(5,0.52)...],即一个int和float的元组。
该代码运行良好,但我似乎无法将其包裹在加载栏(tqdm)上,例如可以使用imap方法完成,如下所示:
tqdm.tqdm(mp.imap(some_function,some_inputs))
Run Code Online (Sandbox Code Playgroud)
星图也可以做到吗?
谢谢!
python multiprocessing python-multiprocessing tqdm process-pool
类似的问题(但答案对我不起作用):如何取消使用 concurrent.futures.ProcessPoolExecutor 运行的长时间运行的子进程?
与上面链接的问题和提供的解决方案不同,在我的情况下,计算本身相当长(受 CPU 限制)并且无法循环运行以检查是否发生了某些事件。
以下代码的简化版本:
import asyncio
import concurrent.futures as futures
import time
class Simulator:
def __init__(self):
self._loop = None
self._lmz_executor = None
self._tasks = []
self._max_execution_time = time.monotonic() + 60
self._long_running_tasks = []
def initialise(self):
# Initialise the main asyncio loop
self._loop = asyncio.get_event_loop()
self._loop.set_default_executor(
futures.ThreadPoolExecutor(max_workers=3))
# Run separate processes of long computation task
self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)
def run(self):
self._tasks.extend(
[self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
)
try:
# Gather bot reasoner tasks
_reasoner_tasks = …
Run Code Online (Sandbox Code Playgroud) python subprocess python-asyncio python-multiprocessing process-pool
为什么以下使用该concurrent.futures
模块的Python代码会永远挂起?
import concurrent.futures
class A:
def f(self):
print("called")
class B(A):
def f(self):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
executor.submit(super().f)
if __name__ == "__main__":
B().f()
Run Code Online (Sandbox Code Playgroud)
这个调用产生了一个无形的例外[Errno 24] Too many open files
(见它,更换线路executor.submit(super().f)
用print(executor.submit(super().f).exception())
)。
但是,按预期将打印替换ProcessPoolExecutor
为ThreadPoolExecutor
“被调用”。
为什么以下使用该multiprocessing.pool
模块的Python代码引发异常AssertionError: daemonic processes are not allowed to have children
?
import multiprocessing.pool
class A:
def f(self):
print("called")
class B(A):
def f(self):
pool = multiprocessing.pool.Pool(2)
pool.apply(super().f)
if __name__ == "__main__":
B().f()
Run Code Online (Sandbox Code Playgroud)
但是,按预期将打印替换Pool
为ThreadPool
“被调用”。
环境:CPython 3.7,MacOS …
我正在尝试捕获ProcessPoolExecutor
.
想象你有一个文件func.py
:
print("imported") # I do not want this print in subprocesses\n\ndef f(x):\n return x\n
Run Code Online (Sandbox Code Playgroud)\n然后你用ProcessPoolExecutor
类似的方式运行该函数
\nfrom concurrent.futures import ProcessPoolExecutor\nfrom func import f # \xe2\x9a\xa0\xef\xb8\x8f the import will print! \xe2\x9a\xa0\xef\xb8\x8f\n\nif __name__ == "__main__":\n with ProcessPoolExecutor() as ex: # \xe2\x9a\xa0\xef\xb8\x8f the import will happen here again and print! \xe2\x9a\xa0\xef\xb8\x8f\n futs = [ex.submit(f, i) for i in range(15)]\n for fut in futs:\n fut.result()\n
Run Code Online (Sandbox Code Playgroud)\n现在我可以使用例如捕获第一次导入的输出contextlib.redirect_stdout
,但是,我也想捕获子进程的所有输出并将它们重定向到主进程的标准输出。
在我的实际用例中,我收到了想要捕获的警告,但简单的打印就重现了该问题。
\n此代码在常规 CPython 3.5 下运行良好:
import concurrent.futures
def job(text):
print(text)
with concurrent.futures.ProcessPoolExecutor(1) as pool:
pool.submit(job, "hello")
Run Code Online (Sandbox Code Playgroud)
但是如果你把它作为 运行python -m doctest myfile.py
,它就会挂起。更改submit(job
为submit(print
使其不会挂起,使用ThreadPoolExecutor
代替ProcessPoolExecutor
.
为什么在 doctest 下运行时会挂起?
python doctest concurrent.futures python-multiprocessing process-pool
以下代码未按预期执行。
import multiprocessing
lock = multiprocessing.Lock()
def dummy():
def log_results_l1(results):
lock.acquire()
print("Writing results", results)
lock.release()
def mp_execute_instance_l1(cmd):
print(cmd)
return cmd
cmds = [x for x in range(10)]
pool = multiprocessing.Pool(processes=8)
for c in cmds:
pool.apply_async(mp_execute_instance_l1, args=(c, ), callback=log_results_l1)
pool.close()
pool.join()
print("done")
dummy()
Run Code Online (Sandbox Code Playgroud)
但是如果函数不是嵌套的,它确实有效。到底是怎么回事。