我正在使用这个答案,以便在Linux机器上运行Python中的多处理并行命令.
我的代码做了类似的事情:
import multiprocessing
import logging
def cycle(offset):
    # Do stuff
def run():
    for nprocess in process_per_cycle:
        logger.info("Start cycle with %d processes", nprocess)
        offsets = list(range(nprocess))
        pool = multiprocessing.Pool(nprocess)
        pool.map(cycle, offsets)
但是我得到了这个错误:OSError: [Errno 24] Too many open files
所以,代码打开了太多文件描述符,即:它启动了太多进程而没有终止它们.
我修了它用这些线代替最后两行:
    with multiprocessing.Pool(nprocess) as pool:
        pool.map(cycle, offsets)
但我不知道为什么这些线路修复了它.
在那下面发生了with什么?
所以,我知道标题中两种方法的区别,但不知道实际含义。
据我了解:如果您使用的 NUM_WORKERS 数量多于实际可用的核心数量,您将面临性能大幅下降,因为您的操作系统不断地来回切换,试图保持并行。不知道这有多真实,但我在某处从比我聪明的人那里读到了它。
在它的文档中os.cpu_count()说:
返回系统中 CPU 的数量。如果未确定则返回 None。该数字不等于当前进程可以使用的 CPU 数量。可用CPU的数量可以通过len(os.sched_getaffinity(0))获得
因此,我试图弄清楚“系统”指的是一个进程可使用的 CPU 数量是否多于“系统”中的 CPU 数量。
我只是想安全有效地实现multiprocessing.pool功能。所以这是我的问题总结:
以下内容有何实际意义:
NUM_WORKERS = os.cpu_count() - 1
# vs.
NUM_WORKERS = len(os.sched_getaffinity(0)) - 1
这-1是因为我发现,如果我尝试在处理数据时工作,我的系统的延迟会少很多。
python parallel-processing multiprocessing python-multiprocessing process-pool
语境:
concurrent.futures.process.ProcessPool来执行代码的Python 应用服务器(是的,我知道importlib.reload有警告)
为了让它工作,我想我必须在进程池管理的importlib.reload每个multiprocessing进程中执行。
有没有办法向进程池中的所有进程提交一些东西?
python parallel-processing multiprocessing python-multiprocessing process-pool
我正在做一些并行处理,如下所示:
with mp.Pool(8) as tmpPool:
        results = tmpPool.starmap(my_function, inputs)
输入看起来像:[(1,0.2312),(5,0.52)...],即一个int和float的元组。
该代码运行良好,但我似乎无法将其包裹在加载栏(tqdm)上,例如可以使用imap方法完成,如下所示:
tqdm.tqdm(mp.imap(some_function,some_inputs))
星图也可以做到吗?
谢谢!
python multiprocessing python-multiprocessing tqdm process-pool
我想监控不同进程的多个工作人员的进度。对于每个子进程,我都有自己的进度条,但它不能与ProcessPoolExecutor执行程序正常工作。
def main():
    with futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:
        fut_to_num = {}
        for i in range(PROCESSES):
            fut = executor.submit(execute_many_threads, i)
            fut_to_num[fut] = i
        for future in futures.as_completed(fut_to_num):
            r = future.result()
            # print('{} returned {}'.format(fut_to_num[future], r))
    print('\nDone!\n')
def execute_many_threads(n_pool=0):
    with futures.ThreadPoolExecutor(max_workers=THREADS) as executor:
        for i in range(THREADS):
            executor.submit(execute_thread, n_pool, i)
    return n_pool+1
def execute_thread(n_pool=0, n_thread=0):
    s = random.randint(1, 5)
    thread_num = n_pool*(PROCESSES-1) + n_thread
    progress = tqdm.tqdm(
        desc='#{:02d}'.format(thread_num),
        position=thread_num,
        total=10*s,
        leave=False,
    )
    # print('Executing {}: {}...'.format(thread_num, s))
    for i in range(s): …python concurrent.futures python-multiprocessing tqdm process-pool
为什么以下使用该concurrent.futures模块的Python代码会永远挂起?
import concurrent.futures
class A:
    def f(self):
        print("called")
class B(A):
    def f(self):
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
        executor.submit(super().f)
if __name__ == "__main__":
    B().f()
这个调用产生了一个无形的例外[Errno 24] Too many open files(见它,更换线路executor.submit(super().f)用print(executor.submit(super().f).exception()))。
但是,按预期将打印替换ProcessPoolExecutor为ThreadPoolExecutor“被调用”。
为什么以下使用该multiprocessing.pool模块的Python代码引发异常AssertionError: daemonic processes are not allowed to have children?
import multiprocessing.pool
class A:
    def f(self):
        print("called")
class B(A):
    def f(self):
        pool = multiprocessing.pool.Pool(2)
        pool.apply(super().f)
if __name__ == "__main__":
    B().f()
但是,按预期将打印替换Pool为ThreadPool“被调用”。
环境:CPython 3.7,MacOS …
类似的问题(但答案对我不起作用):如何取消使用 concurrent.futures.ProcessPoolExecutor 运行的长时间运行的子进程?
与上面链接的问题和提供的解决方案不同,在我的情况下,计算本身相当长(受 CPU 限制)并且无法循环运行以检查是否发生了某些事件。
以下代码的简化版本:
import asyncio
import concurrent.futures as futures
import time
class Simulator:
    def __init__(self):
        self._loop = None
        self._lmz_executor = None
        self._tasks = []
        self._max_execution_time = time.monotonic() + 60
        self._long_running_tasks = []
    def initialise(self):
        # Initialise the main asyncio loop
        self._loop = asyncio.get_event_loop()
        self._loop.set_default_executor(
            futures.ThreadPoolExecutor(max_workers=3))
        # Run separate processes of long computation task
        self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)
    def run(self):
        self._tasks.extend(
            [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
        )
        try:
            # Gather bot reasoner tasks
            _reasoner_tasks = …python subprocess python-asyncio python-multiprocessing process-pool
我正在尝试捕获ProcessPoolExecutor.
想象你有一个文件func.py:
print("imported")  # I do not want this print in subprocesses\n\ndef f(x):\n    return x\n然后你用ProcessPoolExecutor类似的方式运行该函数
\nfrom concurrent.futures import ProcessPoolExecutor\nfrom func import f  # \xe2\x9a\xa0\xef\xb8\x8f the import will print! \xe2\x9a\xa0\xef\xb8\x8f\n\nif __name__ == "__main__":\n    with ProcessPoolExecutor() as ex:  # \xe2\x9a\xa0\xef\xb8\x8f the import will happen here again and print! \xe2\x9a\xa0\xef\xb8\x8f\n        futs = [ex.submit(f, i) for i in range(15)]\n        for fut in futs:\n            fut.result()\n现在我可以使用例如捕获第一次导入的输出contextlib.redirect_stdout,但是,我也想捕获子进程的所有输出并将它们重定向到主进程的标准输出。
在我的实际用例中,我收到了想要捕获的警告,但简单的打印就重现了该问题。
\n此代码在常规 CPython 3.5 下运行良好:
import concurrent.futures
def job(text):
    print(text)
with concurrent.futures.ProcessPoolExecutor(1) as pool:
    pool.submit(job, "hello")
但是如果你把它作为 运行python -m doctest myfile.py,它就会挂起。更改submit(job为submit(print使其不会挂起,使用ThreadPoolExecutor代替ProcessPoolExecutor.
为什么在 doctest 下运行时会挂起?
python doctest concurrent.futures python-multiprocessing process-pool
以下代码未按预期执行。
import multiprocessing
lock = multiprocessing.Lock()
def dummy():
    def log_results_l1(results):
        lock.acquire()
        print("Writing results", results)
        lock.release()
    def mp_execute_instance_l1(cmd):
        print(cmd)
        return cmd
    cmds = [x for x in range(10)]
    pool = multiprocessing.Pool(processes=8)
    for c in cmds:
        pool.apply_async(mp_execute_instance_l1, args=(c, ), callback=log_results_l1)
    pool.close()
    pool.join()
    print("done")
dummy()
但是如果函数不是嵌套的,它确实有效。到底是怎么回事。