相关疑难解决方法(0)

多处理中的共享内存对象

假设我有一个大内存numpy数组,我有一个函数func,它接受这个巨大的数组作为输入(连同一些其他参数).func具有不同参数可以并行运行.例如:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
Run Code Online (Sandbox Code Playgroud)

如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中.

有没有办法让不同的进程共享同一个数组?此数组对象是只读的,永远不会被修改.

更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法分享它?

[EDITED]

我读了答案,但我仍然有点困惑.由于fork()是copy-on-write,因此在python多处理库中生成新进程时不应调用任何额外的成本.但是下面的代码表明存在巨大的开销:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing numpy shared-memory multiprocessing

112
推荐指数
4
解决办法
10万
查看次数

python中的多处理 - 在多个进程之间共享大对象(例如pandas dataframe)

我更精确地使用Python多处理

from multiprocessing import Pool
p = Pool(15)

args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple
res = p.map_async(func, args) #func is some arbitrary function
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

这种方法具有巨大的内存消耗; 几乎占用了我所有的RAM(此时它变得非常慢,因此使多处理非常无用).我假设问题是这df是一个巨大的对象(一个大型的pandas数据帧),它会被复制到每个进程.我试过使用multiprocessing.Value共享数据帧而不复制

shared_df = multiprocessing.Value(pandas.DataFrame, df)
args = [(shared_df, config1), (shared_df, config2), ...] 
Run Code Online (Sandbox Code Playgroud)

(正如Python多处理共享内存中所建议的那样),但是这给了我TypeError: this type has no size(与在Python进程之间共享一个复杂对象相同,遗憾的是我不理解答案).

我第一次使用多处理,也许我的理解还不够好.是multiprocessing.Value实际上即使在这种情况下使用了正确的事情?我已经看到了其他建议(例如队列),但现在有点困惑.有什么选择可以共享内存,在这种情况下哪一个最好?

python multiprocessing pandas

36
推荐指数
4
解决办法
2万
查看次数

在Python 3中安排重复事件

我正在尝试安排重复事件在Python 3中每分钟运行一次.

我见过上课,sched.scheduler但我想知道是否还有其他方法可以做到.我听说提到我可以使用多个线程,我不介意这样做.

我基本上要求一些JSON,然后解析它; 它的价值随着时间而变化.

要使用sched.scheduler我必须创建一个循环来请求它安排甚至运行一小时:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()
Run Code Online (Sandbox Code Playgroud)

有什么其他方法可以做到这一点?

python timing scheduled-tasks python-3.x

29
推荐指数
7
解决办法
7万
查看次数

Python禁用某些对象的引用计数

这个问题来自这里.

我有三个包含python对象(l1,l2l3)的大型列表.这些列表是在程序启动时创建的,它们总共需要16GB的RAM.该程序将专门用于linux.

创建后,我不需要以任何方式或形式修改这些列表或这些列表中的对象.它们必须保留在内存中,直到程序退出.

我在我的程序中使用os.fork()和多处理模块来生成多个子进程(当前最多20个).这些子流程中的每一个都需要能够读取三个列表(l1,l2l3).

我的程序在其他方面工作正常并且非常快.但是我遇到内存消耗问题.由于Linux上的写时复制方法,我希望每个子进程都可以使用这三个列表而不将它们复制到内存中.但是情况并非如此,因为引用任何这些列表中的任何对象都会增加相关的引用计数,从而导致整个内存页面被复制.

所以我的问题是:

我可以禁用引用计数l1,l2以及l3这些列表中的所有对象吗?基本上将整个对象(包括诸如引用计数之类的元数据)设置为只读,以便在任何情况下都不会被修改(我认为,这将允许我利用写入时复制).

目前我担心我被迫转移到另一种编程语言来完成这项任务,因为我目前不需要的"功能"(引用计数),但仍然强加给我并导致不必要的问题.

python refcounting

10
推荐指数
1
解决办法
815
查看次数

为什么python多处理pickle对象在进程之间传递对象?

为什么multiprocessingpython pickle对象的包在进程之间传递它们,即将不同进程的结果返回给主解释器进程?这可能是一个非常天真的问题,但是为什么不能处理A说出处理B"对象x在内存中的点y,它现在是你的"而不必执行将对象表示为字符串所必需的操作.

python pickle multiprocessing

9
推荐指数
1
解决办法
3086
查看次数

python多处理,大数据将进程转变为睡眠状态

我正在使用python 2.7.10.我读了很多文件,将它们存储到一个大的列表中,然后尝试调用多处理并将大列表传递给那些多进程,这样每个进程都可以访问这个大的列表并进行一些计算.

我正在使用像这样的Pool:

def read_match_wrapper(args):
    args2 = args[0] + (args[1],)
    read_match(*args2)

 pool = multiprocessing.Pool(processes=10)
 result=pool.map(read_match_wrapper,itertools.izip(itertools.repeat((ped_list,chr_map,combined_id_to_id,chr)),range(10)))
 pool.close()
 pool.join()
Run Code Online (Sandbox Code Playgroud)

基本上,我将多个变量传递给'read_match'函数.为了使用pool.map,我编写了'read_match_wrapper'函数.我不需要从这些过程中获得任何结果.我只是想让他们跑步和完成.

当我的数据列表'ped_list'非常小时,我可以完成整个过程.当我加载所有数据(如10G)时,它生成的所有多进程都显示为"S",似乎根本不起作用.

我不知道你可以通过池访问多少数据?我真的需要帮助!谢谢!

python sleep pool multiprocessing bigdata

6
推荐指数
1
解决办法
1130
查看次数

使用 --preload 在 gunicorn 中的工人之间共享内存

我有一个大型模型文件,用于在 Flask 中构建的网络服务中,然后通过 Gunicorn 提供服务。文件夹的结构是这样的:

A.py
Folder_1\
    __init__.py
    B.py
Run Code Online (Sandbox Code Playgroud)

模型被加载__init__.py和使用B.py 入口点是A.py包含@app.routes, etc.

我从A.pygunicorn开始并使用--preloadoption预加载应用程序,有8 个工人

我在 8 核上面临 100% 的 CPU 利用率;显然,请求被卡在应用服务器上,没有被转发到数据库。该模型是否也已预加载并可供所有 8 个工作人员使用,即它是否在工作进程之间共享?如果没有,我必须加载模型,A.py以便为所有工作人员预加载模型。

我认为每个工作进程都在加载模型,并且由于模型很大,工作人员被困在那里。

编辑 1:因为我被告知这可能是一个重复的问题,我想澄清我不是在问python如何处理共享对象。我知道使用multiprocessing. 就我而言,我使用 --preload 选项从 gunicorn 启动了 8 个工作人员的烧瓶服务器,我的应用程序有 8 个实例正在运行。我的问题是,由于代码是在工作人员分叉之前预加载的,gunicorn 工作人员将共享相同的模型对象,或者他们每个人都有一个单独的副本。?

python flask gunicorn

6
推荐指数
1
解决办法
7492
查看次数

如何使用共享内存而不是通过多个进程之间的pickle传递对象

我正在研究一个以加法模型为中心的CPU密集型ML问题.由于添加是主要操作,我可以将输入数据分成多个部分并生成多个模型,然后通过覆盖__add__方法合并.

与多处理相关的代码如下所示:

def pool_worker(filename, doshuffle):
    print(f"Processing file: {filename}")
    with open(filename, 'r') as f:
        partial = FragmentModel(order=args.order, indata=f, shuffle=doshuffle)
        return partial

def generateModel(is_mock=False, save=True):
    model = None
    with ThreadPool(args.nthreads) as pool:
        from functools import partial
        partial_models = pool.imap_unordered(partial(pool_worker, doshuffle=is_mock), args.input)
        i = 0
        for m in partial_models:
            logger.info(f'Starting to merge model {i}')
            if model is None:
                import copy
                model = copy.deepcopy(m)
            else:
                model += m
            logger.info(f'Done merging...')
            i += 1

    return model
Run Code Online (Sandbox Code Playgroud)

问题是内存消耗随着模型顺序的增加而呈指数级增长,因此在第4阶段,模型的每个实例大约为4-5 GB,这会导致线程池崩溃,因为中间模型对象不是可拾取的.

我读到了这一点,看起来即使酸洗不是问题,传递这样的数据仍然是非常低效的,如对此答案的评论.

但是,关于如何为此目的使用共享内存的指导很少.是否可以避免此问题而无需更改模型对象的内部结构?

python multiprocessing threadpool

6
推荐指数
2
解决办法
1078
查看次数

subprocess.Popen,从子进程(子进程)获取变量

我想知道如何处理它,我从子进程到父进程获得一个变量/值。

我正在将子进程作为脚本运行。父级看起来像:

import subprocess
p = subprocess.Popen('abaqus python getData.py', shell=True)
p_status = p.wait()
print b
Run Code Online (Sandbox Code Playgroud)

孩子长这样:

from numpy import *

if __name__ == "__main__":

    b = [0,1,2,3]  # output is a list of integers
    global a = b
Run Code Online (Sandbox Code Playgroud)

我对python完全陌生。我认为问题是,我不能以这种方式存储变量并使它们“公开”给父级?我是否必须将它们写在 *.txt 或类似的文件中并使用 numpy.loadtxt() 获取它们?

python subprocess popen

5
推荐指数
1
解决办法
1987
查看次数

将变量更新为多处理 python

我使用 2 个 python 进程,我想知道如何共享和更新变量。我设法将变量发送到进程,但该变量在进程期间没有更新。

在我的代码中,当进程worker启动时,它每 3 秒增加一次变量a。同时这个过程my_service不断展现出价值a

#!/usr/bin/python
# -*- coding: utf-8 -*-

#import multiprocessing as mp
#from multiprocessing import Process
import multiprocessing

import time
from globalvar import *
a=8
#toto=8

def worker():
    name = multiprocessing.current_process().name
    # print (name,"Starting")
    # time.sleep(2)
    # print (name, "Exiting")
    for a in range(1,4):
        print ("worker=",a)
        time.sleep(3)

def my_service(az):
    name = multiprocessing.current_process().name
    # print (name,"Starting")
    # time.sleep(3)
    # print (name, "Exiting")
    while True:
        print ("my_service=",az)
        time.sleep(2) …
Run Code Online (Sandbox Code Playgroud)

python variables multiprocessing

3
推荐指数
1
解决办法
2994
查看次数

如何在python的多处理中跨进程共享大型只读字典/列表?

我有一个 18Gb 的 pickle 文件,我需要跨进程访问它。我尝试使用

from multiprocessing import Manager
import cPickle as pkl
manager = Manager()
data = manager.dict(pkl.load(open("xyz.pkl","rb")))
Run Code Online (Sandbox Code Playgroud)

但是,我遇到以下问题:

IOError: [Errno 11] Resource temporarily unavailable 
Run Code Online (Sandbox Code Playgroud)

有人建议这可能是因为套接字超时,但它似乎不是因为增加超时没有帮助。我该怎么做。还有其他有效的跨进程共享数据的方法吗?

python multithreading multiprocessing

1
推荐指数
1
解决办法
1095
查看次数