假设我有一个大内存numpy数组,我有一个函数func,它接受这个巨大的数组作为输入(连同一些其他参数).func具有不同参数可以并行运行.例如:
def func(arr, param):
# do stuff to arr, param
# build array arr
pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
Run Code Online (Sandbox Code Playgroud)
如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中.
有没有办法让不同的进程共享同一个数组?此数组对象是只读的,永远不会被修改.
更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法分享它?
[EDITED]
我读了答案,但我仍然有点困惑.由于fork()是copy-on-write,因此在python多处理库中生成新进程时不应调用任何额外的成本.但是下面的代码表明存在巨大的开销:
from multiprocessing import Pool, Manager
import numpy as np;
import time
def f(arr):
return len(arr)
t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;
pool = Pool(processes = 6)
t = …Run Code Online (Sandbox Code Playgroud) python parallel-processing numpy shared-memory multiprocessing
我更精确地使用Python多处理
from multiprocessing import Pool
p = Pool(15)
args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple
res = p.map_async(func, args) #func is some arbitrary function
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
这种方法具有巨大的内存消耗; 几乎占用了我所有的RAM(此时它变得非常慢,因此使多处理非常无用).我假设问题是这df是一个巨大的对象(一个大型的pandas数据帧),它会被复制到每个进程.我试过使用multiprocessing.Value共享数据帧而不复制
shared_df = multiprocessing.Value(pandas.DataFrame, df)
args = [(shared_df, config1), (shared_df, config2), ...]
Run Code Online (Sandbox Code Playgroud)
(正如Python多处理共享内存中所建议的那样),但是这给了我TypeError: this type has no size(与在Python进程之间共享一个复杂对象相同?,遗憾的是我不理解答案).
我第一次使用多处理,也许我的理解还不够好.是multiprocessing.Value实际上即使在这种情况下使用了正确的事情?我已经看到了其他建议(例如队列),但现在有点困惑.有什么选择可以共享内存,在这种情况下哪一个最好?
我正在尝试安排重复事件在Python 3中每分钟运行一次.
我见过上课,sched.scheduler但我想知道是否还有其他方法可以做到.我听说提到我可以使用多个线程,我不介意这样做.
我基本上要求一些JSON,然后解析它; 它的价值随着时间而变化.
要使用sched.scheduler我必须创建一个循环来请求它安排甚至运行一小时:
scheduler = sched.scheduler(time.time, time.sleep)
# Schedule the event. THIS IS UGLY!
for i in range(60):
scheduler.enter(3600 * i, 1, query_rate_limit, ())
scheduler.run()
Run Code Online (Sandbox Code Playgroud)
有什么其他方法可以做到这一点?
这个问题来自这里.
我有三个包含python对象(l1,l2和l3)的大型列表.这些列表是在程序启动时创建的,它们总共需要16GB的RAM.该程序将专门用于linux.
创建后,我不需要以任何方式或形式修改这些列表或这些列表中的对象.它们必须保留在内存中,直到程序退出.
我在我的程序中使用os.fork()和多处理模块来生成多个子进程(当前最多20个).这些子流程中的每一个都需要能够读取三个列表(l1,l2和l3).
我的程序在其他方面工作正常并且非常快.但是我遇到内存消耗问题.由于Linux上的写时复制方法,我希望每个子进程都可以使用这三个列表而不将它们复制到内存中.但是情况并非如此,因为引用任何这些列表中的任何对象都会增加相关的引用计数,从而导致整个内存页面被复制.
所以我的问题是:
我可以禁用引用计数l1,l2以及l3这些列表中的所有对象吗?基本上将整个对象(包括诸如引用计数之类的元数据)设置为只读,以便在任何情况下都不会被修改(我认为,这将允许我利用写入时复制).
目前我担心我被迫转移到另一种编程语言来完成这项任务,因为我目前不需要的"功能"(引用计数),但仍然强加给我并导致不必要的问题.
为什么multiprocessingpython pickle对象的包在进程之间传递它们,即将不同进程的结果返回给主解释器进程?这可能是一个非常天真的问题,但是为什么不能处理A说出处理B"对象x在内存中的点y,它现在是你的"而不必执行将对象表示为字符串所必需的操作.
我正在使用python 2.7.10.我读了很多文件,将它们存储到一个大的列表中,然后尝试调用多处理并将大列表传递给那些多进程,这样每个进程都可以访问这个大的列表并进行一些计算.
我正在使用像这样的Pool:
def read_match_wrapper(args):
args2 = args[0] + (args[1],)
read_match(*args2)
pool = multiprocessing.Pool(processes=10)
result=pool.map(read_match_wrapper,itertools.izip(itertools.repeat((ped_list,chr_map,combined_id_to_id,chr)),range(10)))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
基本上,我将多个变量传递给'read_match'函数.为了使用pool.map,我编写了'read_match_wrapper'函数.我不需要从这些过程中获得任何结果.我只是想让他们跑步和完成.
当我的数据列表'ped_list'非常小时,我可以完成整个过程.当我加载所有数据(如10G)时,它生成的所有多进程都显示为"S",似乎根本不起作用.
我不知道你可以通过池访问多少数据?我真的需要帮助!谢谢!
我有一个大型模型文件,用于在 Flask 中构建的网络服务中,然后通过 Gunicorn 提供服务。文件夹的结构是这样的:
A.py
Folder_1\
__init__.py
B.py
Run Code Online (Sandbox Code Playgroud)
模型被加载__init__.py和使用B.py
入口点是A.py包含@app.routes, etc.
我从A.pygunicorn开始并使用--preloadoption预加载应用程序,有8 个工人。
我在 8 核上面临 100% 的 CPU 利用率;显然,请求被卡在应用服务器上,没有被转发到数据库。该模型是否也已预加载并可供所有 8 个工作人员使用,即它是否在工作进程之间共享?如果没有,我必须加载模型,A.py以便为所有工作人员预加载模型。
我认为每个工作进程都在加载模型,并且由于模型很大,工作人员被困在那里。
编辑 1:因为我被告知这可能是一个重复的问题,我想澄清我不是在问python如何处理共享对象。我知道使用multiprocessing. 就我而言,我使用 --preload 选项从 gunicorn 启动了 8 个工作人员的烧瓶服务器,我的应用程序有 8 个实例正在运行。我的问题是,由于代码是在工作人员分叉之前预加载的,gunicorn 工作人员将共享相同的模型对象,或者他们每个人都有一个单独的副本。?
我正在研究一个以加法模型为中心的CPU密集型ML问题.由于添加是主要操作,我可以将输入数据分成多个部分并生成多个模型,然后通过覆盖__add__方法合并.
与多处理相关的代码如下所示:
def pool_worker(filename, doshuffle):
print(f"Processing file: {filename}")
with open(filename, 'r') as f:
partial = FragmentModel(order=args.order, indata=f, shuffle=doshuffle)
return partial
def generateModel(is_mock=False, save=True):
model = None
with ThreadPool(args.nthreads) as pool:
from functools import partial
partial_models = pool.imap_unordered(partial(pool_worker, doshuffle=is_mock), args.input)
i = 0
for m in partial_models:
logger.info(f'Starting to merge model {i}')
if model is None:
import copy
model = copy.deepcopy(m)
else:
model += m
logger.info(f'Done merging...')
i += 1
return model
Run Code Online (Sandbox Code Playgroud)
问题是内存消耗随着模型顺序的增加而呈指数级增长,因此在第4阶段,模型的每个实例大约为4-5 GB,这会导致线程池崩溃,因为中间模型对象不是可拾取的.
我读到了这一点,看起来即使酸洗不是问题,传递这样的数据仍然是非常低效的,如对此答案的评论.
但是,关于如何为此目的使用共享内存的指导很少.是否可以避免此问题而无需更改模型对象的内部结构?
我想知道如何处理它,我从子进程到父进程获得一个变量/值。
我正在将子进程作为脚本运行。父级看起来像:
import subprocess
p = subprocess.Popen('abaqus python getData.py', shell=True)
p_status = p.wait()
print b
Run Code Online (Sandbox Code Playgroud)
孩子长这样:
from numpy import *
if __name__ == "__main__":
b = [0,1,2,3] # output is a list of integers
global a = b
Run Code Online (Sandbox Code Playgroud)
我对python完全陌生。我认为问题是,我不能以这种方式存储变量并使它们“公开”给父级?我是否必须将它们写在 *.txt 或类似的文件中并使用 numpy.loadtxt() 获取它们?
我使用 2 个 python 进程,我想知道如何共享和更新变量。我设法将变量发送到进程,但该变量在进程期间没有更新。
在我的代码中,当进程worker启动时,它每 3 秒增加一次变量a。同时这个过程my_service不断展现出价值a。
#!/usr/bin/python
# -*- coding: utf-8 -*-
#import multiprocessing as mp
#from multiprocessing import Process
import multiprocessing
import time
from globalvar import *
a=8
#toto=8
def worker():
name = multiprocessing.current_process().name
# print (name,"Starting")
# time.sleep(2)
# print (name, "Exiting")
for a in range(1,4):
print ("worker=",a)
time.sleep(3)
def my_service(az):
name = multiprocessing.current_process().name
# print (name,"Starting")
# time.sleep(3)
# print (name, "Exiting")
while True:
print ("my_service=",az)
time.sleep(2) …Run Code Online (Sandbox Code Playgroud) 我有一个 18Gb 的 pickle 文件,我需要跨进程访问它。我尝试使用
from multiprocessing import Manager
import cPickle as pkl
manager = Manager()
data = manager.dict(pkl.load(open("xyz.pkl","rb")))
Run Code Online (Sandbox Code Playgroud)
但是,我遇到以下问题:
IOError: [Errno 11] Resource temporarily unavailable
Run Code Online (Sandbox Code Playgroud)
有人建议这可能是因为套接字超时,但它似乎不是因为增加超时没有帮助。我该怎么做。还有其他有效的跨进程共享数据的方法吗?
python ×11
bigdata ×1
flask ×1
gunicorn ×1
numpy ×1
pandas ×1
pickle ×1
pool ×1
popen ×1
python-3.x ×1
refcounting ×1
sleep ×1
subprocess ×1
threadpool ×1
timing ×1
variables ×1