标签: process-pool

多处理返回"太多打开的文件",但使用`with ... as`修复它.为什么?

我正在使用这个答案,以便在Linux机器上运行Python中的多处理并行命令.

我的代码做了类似的事情:

import multiprocessing
import logging

def cycle(offset):
    # Do stuff

def run():
    for nprocess in process_per_cycle:
        logger.info("Start cycle with %d processes", nprocess)
        offsets = list(range(nprocess))
        pool = multiprocessing.Pool(nprocess)
        pool.map(cycle, offsets)
Run Code Online (Sandbox Code Playgroud)

但是我得到了这个错误:OSError: [Errno 24] Too many open files
所以,代码打开了太多文件描述符,即:它启动了太多进程而没有终止它们.

我修了它用这些线代替最后两行:

    with multiprocessing.Pool(nprocess) as pool:
        pool.map(cycle, offsets)
Run Code Online (Sandbox Code Playgroud)

但我不知道为什么这些线路修复了它.

在那下面发生了with什么?

python multiprocessing with-statement process-pool

15
推荐指数
1
解决办法
9614
查看次数

os.sched_getaffinity(0) 与 os.cpu_count()

所以,我知道标题中两种方法的区别,但不知道实际含义。

据我了解:如果您使用的 NUM_WORKERS 数量多于实际可用的核心数量,您将面临性能大幅下降,因为您的操作系统不断地来回切换,试图保持并行。不知道这有多真实,但我在某处从比我聪明的人那里读到了它。

在它的文档中os.cpu_count()说:

返回系统中 CPU 的数量。如果未确定则返回 None。该数字不等于当前进程可以使用的 CPU 数量。可用CPU的数量可以通过len(os.sched_getaffinity(0))获得

因此,我试图弄清楚“系统”指的是一个进程可使用的 CPU 数量是否多于“系统”中的 CPU 数量。

我只是想安全有效地实现multiprocessing.pool功能。所以这是我的问题总结:

以下内容有何实际意义:

NUM_WORKERS = os.cpu_count() - 1
# vs.
NUM_WORKERS = len(os.sched_getaffinity(0)) - 1
Run Code Online (Sandbox Code Playgroud)

-1是因为我发现,如果我尝试在处理数据时工作,我的系统的延迟会少很多。

python parallel-processing multiprocessing python-multiprocessing process-pool

12
推荐指数
2
解决办法
7195
查看次数

提交代码以执行到 concurrent.futures.ProcessPool 中的所有进程

语境:

  • 一个使用 aconcurrent.futures.process.ProcessPool来执行代码的Python 应用服务器
  • 我们有时想在不重启整个服务器进程的情况下热重载导入的代码

(是的,我知道importlib.reload警告

为了让它工作,我想我必须在进程池管理的importlib.reload每个multiprocessing进程中执行。

有没有办法向进程池中的所有进程提交一些东西?

python parallel-processing multiprocessing python-multiprocessing process-pool

8
推荐指数
1
解决办法
153
查看次数

将 tqdm 与多重处理结合使用以实现多个进度条

我想监控不同进程的多个工作人员的进度。对于每个子进程,我都有自己的进度条,但它不能与ProcessPoolExecutor执行程序正常工作。

def main():
    with futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:
        fut_to_num = {}
        for i in range(PROCESSES):
            fut = executor.submit(execute_many_threads, i)
            fut_to_num[fut] = i

        for future in futures.as_completed(fut_to_num):
            r = future.result()
            # print('{} returned {}'.format(fut_to_num[future], r))
    print('\nDone!\n')


def execute_many_threads(n_pool=0):
    with futures.ThreadPoolExecutor(max_workers=THREADS) as executor:
        for i in range(THREADS):
            executor.submit(execute_thread, n_pool, i)
    return n_pool+1


def execute_thread(n_pool=0, n_thread=0):
    s = random.randint(1, 5)
    thread_num = n_pool*(PROCESSES-1) + n_thread

    progress = tqdm.tqdm(
        desc='#{:02d}'.format(thread_num),
        position=thread_num,
        total=10*s,
        leave=False,
    )
    # print('Executing {}: {}...'.format(thread_num, s))
    for i in range(s): …
Run Code Online (Sandbox Code Playgroud)

python concurrent.futures python-multiprocessing tqdm process-pool

7
推荐指数
0
解决办法
6045
查看次数

星图结合tqdm?

我正在做一些并行处理,如下所示:

with mp.Pool(8) as tmpPool:
        results = tmpPool.starmap(my_function, inputs)
Run Code Online (Sandbox Code Playgroud)

输入看起来像:[(1,0.2312),(5,0.52)...],即一个int和float的元组。

该代码运行良好,但我似乎无法将其包裹在加载栏(tqdm)上,例如可以使用imap方法完成,如下所示:

tqdm.tqdm(mp.imap(some_function,some_inputs))
Run Code Online (Sandbox Code Playgroud)

星图也可以做到吗?

谢谢!

python multiprocessing python-multiprocessing tqdm process-pool

7
推荐指数
3
解决办法
340
查看次数

如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?

类似的问题(但答案对我不起作用):如何取消使用 concurrent.futures.ProcessPoolExecutor 运行的长时间运行的子进程?

与上面链接的问题和提供的解决方案不同,在我的情况下,计算本身相当长(受 CPU 限制)并且无法循环运行以检查是否发生了某些事件。

以下代码的简化版本:

import asyncio
import concurrent.futures as futures
import time

class Simulator:
    def __init__(self):
        self._loop = None
        self._lmz_executor = None
        self._tasks = []
        self._max_execution_time = time.monotonic() + 60
        self._long_running_tasks = []

    def initialise(self):
        # Initialise the main asyncio loop
        self._loop = asyncio.get_event_loop()
        self._loop.set_default_executor(
            futures.ThreadPoolExecutor(max_workers=3))

        # Run separate processes of long computation task
        self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)

    def run(self):
        self._tasks.extend(
            [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
        )

        try:
            # Gather bot reasoner tasks
            _reasoner_tasks = …
Run Code Online (Sandbox Code Playgroud)

python subprocess python-asyncio python-multiprocessing process-pool

5
推荐指数
1
解决办法
2043
查看次数

为什么并发.futures.ProcessPoolExecutor和multiprocessing.pool.Pool在Python中使用super失败?

为什么以下使用该concurrent.futures模块的Python代码会永远挂起?

import concurrent.futures


class A:

    def f(self):
        print("called")


class B(A):

    def f(self):
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
        executor.submit(super().f)


if __name__ == "__main__":
    B().f()
Run Code Online (Sandbox Code Playgroud)

这个调用产生了一个无形的例外[Errno 24] Too many open files(见它,更换线路executor.submit(super().f)print(executor.submit(super().f).exception()))。

但是,按预期将打印替换ProcessPoolExecutorThreadPoolExecutor“被调用”。

为什么以下使用该multiprocessing.pool模块的Python代码引发异常AssertionError: daemonic processes are not allowed to have children

import multiprocessing.pool


class A:

    def f(self):
        print("called")


class B(A):

    def f(self):
        pool = multiprocessing.pool.Pool(2)
        pool.apply(super().f)


if __name__ == "__main__":
    B().f()
Run Code Online (Sandbox Code Playgroud)

但是,按预期将打印替换PoolThreadPool“被调用”。

环境:CPython 3.7,MacOS …

python pickle python-multiprocessing process-pool

5
推荐指数
1
解决办法
544
查看次数

捕获/重定向 ProcessPoolExecutor 的所有输出

我正在尝试捕获ProcessPoolExecutor.

\n

想象你有一个文件func.py

\n
print("imported")  # I do not want this print in subprocesses\n\ndef f(x):\n    return x\n
Run Code Online (Sandbox Code Playgroud)\n

然后你用ProcessPoolExecutor类似的方式运行该函数

\n
\nfrom concurrent.futures import ProcessPoolExecutor\nfrom func import f  # \xe2\x9a\xa0\xef\xb8\x8f the import will print! \xe2\x9a\xa0\xef\xb8\x8f\n\nif __name__ == "__main__":\n    with ProcessPoolExecutor() as ex:  # \xe2\x9a\xa0\xef\xb8\x8f the import will happen here again and print! \xe2\x9a\xa0\xef\xb8\x8f\n        futs = [ex.submit(f, i) for i in range(15)]\n        for fut in futs:\n            fut.result()\n
Run Code Online (Sandbox Code Playgroud)\n

现在我可以使用例如捕获第一次导入的输出contextlib.redirect_stdout,但是,我也想捕获子进程的所有输出并将它们重定向到主进程的标准输出。

\n

在我的实际用例中,我收到了想要捕获的警告,但简单的打印就重现了该问题。

\n

这与防止以下错误相关:https://github.com/Textualize/rich/issues/2371 …

python subprocess process-pool

5
推荐指数
0
解决办法
642
查看次数

Python doctest 使用 ProcessPoolExecutor 挂起

此代码在常规 CPython 3.5 下运行良好:

import concurrent.futures

def job(text):
    print(text)

with concurrent.futures.ProcessPoolExecutor(1) as pool:
    pool.submit(job, "hello")
Run Code Online (Sandbox Code Playgroud)

但是如果你把它作为 运行python -m doctest myfile.py,它就会挂起。更改submit(jobsubmit(print使其不会挂起,使用ThreadPoolExecutor代替ProcessPoolExecutor.

为什么在 doctest 下运行时会挂起?

python doctest concurrent.futures python-multiprocessing process-pool

4
推荐指数
2
解决办法
2792
查看次数

多处理池不适用于嵌套函数

以下代码未按预期执行。

import multiprocessing

lock = multiprocessing.Lock()
def dummy():
    def log_results_l1(results):
        lock.acquire()
        print("Writing results", results)
        lock.release()

    def mp_execute_instance_l1(cmd):
        print(cmd)
        return cmd

    cmds = [x for x in range(10)]

    pool = multiprocessing.Pool(processes=8)

    for c in cmds:
        pool.apply_async(mp_execute_instance_l1, args=(c, ), callback=log_results_l1)

    pool.close()
    pool.join()
    print("done")


dummy()
Run Code Online (Sandbox Code Playgroud)

但是如果函数不是嵌套的,它确实有效。到底是怎么回事。

python python-2.7 python-multiprocessing process-pool

2
推荐指数
1
解决办法
2527
查看次数