multiprocessing我们可以使用已创建的上下文使用Python 3 创建非分叉进程:
ctx = multiprocessing.get_context('spawn')
p = ctx.Process(target=foo, args=(42,))
p.start()
Run Code Online (Sandbox Code Playgroud)
但假设我正在使用Process. 有没有办法Process使用除 之外的方法创建子类实例fork?
我不确定这个标题是否适合我的情况:我想分享 numpy array 的原因是它可能是我的情况的潜在解决方案之一,但如果您有其他解决方案也很好。
我的任务:我需要实现一个具有多重处理的迭代算法,而每个进程都需要有一份数据副本(该数据很大,并且是只读的,并且在迭代算法期间不会改变)。
我写了一些伪代码来证明我的想法:
import multiprocessing
def worker_func(data, args):
# do sth...
return res
def compute(data, process_num, niter):
data
result = []
args = init()
for iter in range(niter):
args_chunk = split_args(args, process_num)
pool = multiprocessing.Pool()
for i in range(process_num):
result.append(pool.apply_async(worker_func,(data, args_chunk[i])))
pool.close()
pool.join()
# aggregate result and update args
for res in result:
args = update_args(res.get())
if __name__ == "__main__":
compute(data, 4, 100)
Run Code Online (Sandbox Code Playgroud)
问题是在每次迭代中,我都必须将数据传递给子进程,这是非常耗时的。
我想出了两种可能的解决方案:
那么,有什么方法可以在进程之间共享只读 numpy 数组吗?或者,如果您很好地实现了解决方案 2,它也可以工作。 …
python numpy multiprocessing python-3.x python-multiprocessing
我正在尝试使用该multiprocessing模块,更具体地说是该Pool.apply_async()功能。
这段代码运行良好:
import multiprocessing
def do():
print("Foobar", flush=True)
with multiprocessing.Pool(1) as pool:
for i in range(2):
pool.apply_async(do)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
该"Foobar"字符串被打印两次。
但是,如果我将此代码放入函数中然后调用该函数,则不会发生任何情况。没有错误也没有"Foobar",程序静静地结束。
import multiprocessing
def test():
def do():
print("Foobar", flush=True)
with multiprocessing.Pool(1) as pool:
for i in range(5):
pool.apply_async(do)
pool.close()
pool.join()
test()
Run Code Online (Sandbox Code Playgroud)
为什么?我在 Linux 上使用 Python 3.7.3。
python asynchronous multiprocessing python-3.x python-multiprocessing
我试图同时运行两个 python 文件(A.py 和 B.py)。我尝试以两种不同的方式运行它们,并使用两个单独的命令来运行它们,效果很好。
问题是,我希望有一个脚本文件来并行运行它们。我尝试了多重处理,如下代码:
if __name__ == '__main__':
jobs=[]
jobs.append(Process(target=A.start))
jobs.append(Process(target=B.start))
for job in jobs:
job.start()
for job in jobs:
job.join()
Run Code Online (Sandbox Code Playgroud)
结果是它运行 A 和 B 两次,我希望它们每个只运行一次。
问题的原因是什么?我该如何解决?或者还有其他解决方案可以并行运行两个 python 文件吗?
提前感谢您的帮助。
对于导入信息,我有三个文件:A.py、B.py 和 run.py。
在 A.py 中,我有:
from scapy.all import *
from scapy.layers.http import HTTPRequest
from scapy.layers.http import HTTPResponse
from colorama import init, Fore
import docker
import time
import redis
Run Code Online (Sandbox Code Playgroud)
在 B.py 中,我有:
import json
import docker
import socket
import time
import psutil
import socket
import redis
import …Run Code Online (Sandbox Code Playgroud) 我想通过利用多重处理来部分更改大型 numpy 数组中的值。
也就是说我最后想要得到[[100, 100, 100], [100, 100, 100]]。
但是,以下代码是错误的,它显示“RuntimeError:SynchronizedArray 对象只能通过继承在进程之间共享”
我应该怎么办?谢谢。
import numpy as np
import multiprocessing
from multiprocessing import RawArray, Array
def change_array(array, i, j):
X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
X_np[i, j] = 100
print(np.frombuffer(array.get_obj()))
if __name__ == '__main__':
X_shape = (2, 3)
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
X = Array('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
# Copy data to …Run Code Online (Sandbox Code Playgroud) 我一直在阅读类似这样的帖子,但其中任何一个似乎都适合我的情况。我正在尝试使用 Python 中的多重处理并行化以下玩具示例,以在 for 循环内填充 Numpy 数组:
import numpy as np
from multiprocessing import Pool
import time
def func1(x, y=1):
return x**2 + y
def func2(n, parallel=False):
my_array = np.zeros((n))
# Parallelized version:
if parallel:
pool = Pool(processes=6)
for idx, val in enumerate(range(1, n+1)):
result = pool.apply_async(func1, [val])
my_array[idx] = result.get()
pool.close()
# Not parallelized version:
else:
for i in range(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(60000)
end = time.time() …Run Code Online (Sandbox Code Playgroud) python parallel-processing numpy python-3.x python-multiprocessing
我在 Precision 5520 笔记本电脑和 Google Compute Engine 的 c2-standard-4 上运行以下代码:
import multiprocessing as mp
import time
def foo():
before = time.time()
sum = 0
for i in range(1, 100000000):
sum += i
print(time.time() - before, sum)
for i in range(mp.cpu_count()):
mp.Process(target=foo).start()
Run Code Online (Sandbox Code Playgroud)
在这两种情况下,mp.cpu_count()都是 4。然而,出乎我意料的是,笔记本电脑上的每次计算需要 5.2 秒,而 GCE 上则需要 8.6 秒。
该笔记本电脑运行 Xeon E3-1505M v6 @ 3GHz。它是一个强大的 CPU,但我认为笔记本电脑的 CPU 无法与 Google HPC 服务器上的 CPU 相比(例如,由于热量限制)。
笔记本电脑上的Python版本是3.8.5。GCE 的 Python 版本为 3.9.2。
为什么会这样呢?
更新根据 @John Hanley 的回复,我更改了 Google Cloud Platform 的配置,因此每个核心仅使用一个 …
python performance google-cloud-platform python-multiprocessing
#!/usr/bin/env python3
import multiprocessing as mp
def child_process():
print('Hi! My name is', __name__)
print('Hi! My name is', __name__)
if __name__ == '__main__':
mp.Process(target=child_process).start()
Run Code Online (Sandbox Code Playgroud)
我很困惑
__mp_main__?代码:
if __name__ == "__main__":
p = ProcessPoolExecutor()
p.submit(lambda x: print(x), "something") # doesn't work
p.submit(print, "something") # works fine
time.sleep(0.5)
Run Code Online (Sandbox Code Playgroud)
为什么这是有道理的?
我正在将一些代码从纯 Python 重写为 JAX。我已经达到了这样的程度:在我的旧代码中,我使用 Python 的多处理模块来并行计算单个节点中所有 CPU 核心上的函数,如下所示:
# start pool process
pool = multiprocessing.Pool(processes=10) # if node has 10 CPU cores, start 10 processes
# use pool.map to evaluate function(input) for each input in parallel
# suppose len(inputs) is very large and 10 inputs are processed in parallel at a time
# store the results in a list called out
out = pool.map(function,inputs)
# close pool processes to free memory
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
我知道 JAX 有 vmap 和 pmap,但我不明白它们是否可以直接替代我上面使用的 multiprocessing.pool.map。 …
parallel-processing multiprocessing spmd python-multiprocessing jax