标签: python-multiprocessing

Python 3:生成进程子类

multiprocessing我们可以使用已创建的上下文使用Python 3 创建非分叉进程:

ctx = multiprocessing.get_context('spawn')
p = ctx.Process(target=foo, args=(42,))
p.start()
Run Code Online (Sandbox Code Playgroud)

但假设我正在使用Process. 有没有办法Process使用除 之外的方法创建子类实例fork

python python-3.x python-multiprocessing

2
推荐指数
1
解决办法
583
查看次数

python3多进程共享numpy数组(只读)

我不确定这个标题是否适合我的情况:我想分享 numpy array 的原因是它可能是我的情况的潜在解决方案之一,但如果您有其他解决方案也很好。

我的任务:我需要实现一个具有多重处理的迭代算法,而每个进程都需要有一份数据副本(该数据很大,并且是只读的,并且在迭代算法期间不会改变)。

我写了一些伪代码来证明我的想法:

import multiprocessing


def worker_func(data, args):
    # do sth...
    return res

def compute(data, process_num, niter):
    data
    result = []
    args = init()

    for iter in range(niter):
        args_chunk = split_args(args, process_num)
        pool = multiprocessing.Pool()
        for i in range(process_num):
            result.append(pool.apply_async(worker_func,(data, args_chunk[i])))
        pool.close()
        pool.join()
        # aggregate result and update args
        for res in result:
            args = update_args(res.get())

if __name__ == "__main__":
    compute(data, 4, 100)
Run Code Online (Sandbox Code Playgroud)

问题是在每次迭代中,我都必须将数据传递给子进程,这是非常耗时的。

我想出了两种可能的解决方案:

  1. 在进程之间共享数据(它是 ndarray),这就是这个问题的标题。
  2. 保持子进程处于活动状态,例如守护进程或其他进程...并等待调用。通过这样做,我只需要在一开始就传递数据。

那么,有什么方法可以在进程之间共享只读 numpy 数组吗?或者,如果您很好地实现了解决方案 2,它也可以工作。 …

python numpy multiprocessing python-3.x python-multiprocessing

2
推荐指数
1
解决办法
1872
查看次数

如果从函数内部执行,带有“apply_async”的多处理池不会执行任何操作

我正在尝试使用该multiprocessing模块,更具体地说是该Pool.apply_async()功能。

这段代码运行良好:

import multiprocessing

def do():
    print("Foobar", flush=True)

with multiprocessing.Pool(1) as pool:
    for i in range(2):
        pool.apply_async(do)

    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

"Foobar"字符串被打印两次。

但是,如果我将此代码放入函数中然后调用该函数,则不会发生任何情况。没有错误也没有"Foobar",程序静静地结束。

import multiprocessing

def test():

    def do():
        print("Foobar", flush=True)

    with multiprocessing.Pool(1) as pool:
        for i in range(5):
            pool.apply_async(do)

        pool.close()
        pool.join()

test()
Run Code Online (Sandbox Code Playgroud)

为什么?我在 Linux 上使用 Python 3.7.3。

python asynchronous multiprocessing python-3.x python-multiprocessing

2
推荐指数
1
解决办法
3007
查看次数

Python多进程多次运行

我试图同时运行两个 python 文件(A.py 和 B.py)。我尝试以两种不同的方式运行它们,并使用两个单独的命令来运行它们,效果很好。

问题是,我希望有一个脚本文件来并行运行它们。我尝试了多重处理,如下代码:

if __name__ == '__main__':
    jobs=[]
    jobs.append(Process(target=A.start))
    jobs.append(Process(target=B.start))

    for job in jobs:
        job.start()

    for job in jobs:
        job.join()
Run Code Online (Sandbox Code Playgroud)

结果是它运行 A 和 B 两次,我希望它们每个只运行一次。

问题的原因是什么?我该如何解决?或者还有其他解决方案可以并行运行两个 python 文件吗?

提前感谢您的帮助。

对于导入信息,我有三个文件:A.py、B.py 和 run.py。

在 A.py 中,我有:

from scapy.all import *
from scapy.layers.http import HTTPRequest 
from scapy.layers.http import HTTPResponse
from colorama import init, Fore
import docker
import time
import redis
Run Code Online (Sandbox Code Playgroud)

在 B.py 中,我有:

import json
import docker
import socket
import time
import psutil
import socket
import redis
import …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing python-multiprocessing

2
推荐指数
1
解决办法
3825
查看次数

共享 numpy 数组时的 python 多处理

我想通过利用多重处理来部分更改大型 numpy 数组中的值。

也就是说我最后想要得到[[100, 100, 100], [100, 100, 100]]。

但是,以下代码是错误的,它显示“RuntimeError:SynchronizedArray 对象只能通过继承在进程之间共享”

我应该怎么办?谢谢。

import numpy as np
import multiprocessing

from multiprocessing import RawArray, Array


def change_array(array, i, j):
    X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
    X_np[i, j] = 100
    print(np.frombuffer(array.get_obj()))

if __name__ == '__main__':
    X_shape = (2, 3)
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    X = Array('d', X_shape[0] * X_shape[1])
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
    # Copy data to …
Run Code Online (Sandbox Code Playgroud)

python arrays numpy python-multiprocessing

2
推荐指数
1
解决办法
3345
查看次数

Python 中的多重处理:并行化 for 循环以填充 Numpy 数组

我一直在阅读类似这样的帖子,但其中任何一个似乎都适合我的情况。我正在尝试使用 Python 中的多重处理并行化以下玩具示例,以在 for 循环内填充 Numpy 数组:

import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        for idx, val in enumerate(range(1, n+1)):
            result = pool.apply_async(func1, [val])
            my_array[idx] = result.get()
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time() …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing numpy python-3.x python-multiprocessing

2
推荐指数
1
解决办法
5196
查看次数

为什么 Google Compute Engine 的计算优化 CPU 在 Python 中进行数字运算时比笔记本电脑慢

我在 Precision 5520 笔记本电脑和 Google Compute Engine 的 c2-standard-4 上运行以下代码:

import multiprocessing as mp
import time

def foo():
    before = time.time()
    sum = 0
    for i in range(1, 100000000):
        sum += i
    print(time.time() - before, sum)

for i in range(mp.cpu_count()):
    mp.Process(target=foo).start()
Run Code Online (Sandbox Code Playgroud)

在这两种情况下,mp.cpu_count()都是 4。然而,出乎我意料的是,笔记本电脑上的每次计算需要 5.2 秒,而 GCE 上则需要 8.6 秒。

该笔记本电脑运行 Xeon E3-1505M v6 @ 3GHz。它是一个强大的 CPU,但我认为笔记本电脑的 CPU 无法与 Google HPC 服务器上的 CPU 相比(例如,由于热量限制)。

笔记本电脑上的Python版本是3.8.5。GCE 的 Python 版本为 3.9.2。

为什么会这样呢?

更新根据 @John Hanley 的回复,我更改了 Google Cloud Platform 的配置,因此每个核心仅使用一个 …

python performance google-cloud-platform python-multiprocessing

2
推荐指数
1
解决办法
371
查看次数

在Python多处理中,为什么子进程名称是__mp_main__?有没有办法用自定义名称覆盖它?

#!/usr/bin/env python3

import multiprocessing as mp


def child_process():
    print('Hi! My name is', __name__)


print('Hi! My name is', __name__)
if __name__ == '__main__':
    mp.Process(target=child_process).start()

Run Code Online (Sandbox Code Playgroud)

上面的代码输出如下: 上述代码的控制台输出

我很困惑

  1. 为什么是子进程名称__mp_main__
  2. 为什么要打印两次?

python multiprocessing python-multiprocessing

2
推荐指数
1
解决办法
1658
查看次数

ProcessPoolExecutor 无法执行我自己的函数,但执行打印工作

代码:

if __name__ == "__main__":
    p = ProcessPoolExecutor()
    p.submit(lambda x: print(x), "something")  # doesn't work
    p.submit(print, "something")  # works fine
    time.sleep(0.5)

Run Code Online (Sandbox Code Playgroud)

为什么这是有道理的?

python python-multiprocessing

2
推荐指数
1
解决办法
582
查看次数

JAX vmap vs pmap vs Python 多处理

我正在将一些代码从纯 Python 重写为 JAX。我已经达到了这样的程度:在我的旧代码中,我使用 Python 的多处理模块来并行计算单个节点中所有 CPU 核心上的函数,如下所示:

# start pool process 
pool = multiprocessing.Pool(processes=10) # if node has 10 CPU cores, start 10 processes

# use pool.map to evaluate function(input) for each input in parallel
# suppose len(inputs) is very large and 10 inputs are processed in parallel at a time
# store the results in a list called out
out = pool.map(function,inputs)

# close pool processes to free memory
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

我知道 JAX 有 vmap 和 pmap,但我不明白它们是否可以直接替代我上面使用的 multiprocessing.pool.map。 …

parallel-processing multiprocessing spmd python-multiprocessing jax

2
推荐指数
1
解决办法
1299
查看次数