我有一个使用Twisted的xmlrpc服务器.服务器有大量的数据存储在内存中.是否可以运行一个辅助的单独的xmlrpc服务器,它可以访问第一个服务器中的内存中的对象?
因此,serverA启动并创建一个对象.serverB启动并可以从serverA中的对象读取.
*编辑*
要共享的数据是一百万个元组的列表.
我有三个大清单.首先包含bitarrays(模块bitarray 0.8.0),另外两个包含整数数组.
l1=[bitarray 1, bitarray 2, ... ,bitarray n]
l2=[array 1, array 2, ... , array n]
l3=[array 1, array 2, ... , array n]
Run Code Online (Sandbox Code Playgroud)
这些数据结构需要相当多的RAM(总共约16GB).
如果我使用以下方式启动12个子流程:
multiprocessing.Process(target=someFunction, args=(l1,l2,l3))
Run Code Online (Sandbox Code Playgroud)
这是否意味着将为每个子流程复制l1,l2和l3,或者子流程是否会共享这些列表?或者更直接,我会使用16GB或192GB的RAM吗?
someFunction将从这些列表中读取一些值,然后根据读取的值执行一些计算.结果将返回到父进程.someIunction不会修改列表l1,l2和l3.
因此,我认为子流程不需要也不会复制这些巨大的列表,而只是与父级共享它们.这意味着由于linux下的写时复制方法,该程序将占用16GB的RAM(无论我启动多少个子进程)?我是正确的还是我错过了会导致列表被复制的内容?
编辑:在阅读了关于这个主题的更多内容后,我仍然感到困惑.一方面,Linux使用copy-on-write,这意味着不会复制任何数据.另一方面,访问该对象将改变其重新计数(我仍然不确定为什么以及这意味着什么).即便如此,是否会复制整个对象?
例如,如果我定义someFunction如下:
def someFunction(list1, list2, list3):
i=random.randint(0,99999)
print list1[i], list2[i], list3[i]
Run Code Online (Sandbox Code Playgroud)
是否使用此函数意味着将为每个子流程完全复制l1,l2和l3?
有没有办法检查这个?
EDIT2在子流程运行的同时读取更多内容并监视系统的总内存使用情况后,似乎确实为每个子流程复制了整个对象.它似乎是因为引用计数.
在我的程序中实际上不需要l1,l2和l3的引用计数.这是因为l1,l2和l3将保留在内存中(未更改),直到父进程退出.在此之前,不需要释放这些列表使用的内存.事实上,我确信引用计数将保持在0以上(对于这些列表和这些列表中的每个对象),直到程序退出.
所以现在问题变成了,我怎样才能确保不会将对象复制到每个子进程?我可以禁用这些列表和这些列表中的每个对象的引用计数吗?
EDIT3只是一个额外的说明.子进程并不需要修改l1,l2并l3或在这些列表中的任何对象.子进程只需要能够引用其中一些对象,而不会导致为每个子进程复制内存.
我看到的那段代码看起来像这样:
glbl_array = # a 3 Gb array
def my_func( args, def_param = glbl_array):
#do stuff on args and def_param
if __name__ == '__main__':
pool = Pool(processes=4)
pool.map(my_func, range(1000))
Run Code Online (Sandbox Code Playgroud)
有没有办法确保(或鼓励)不同的进程没有获得glbl_array的副本但共享它.如果没有办法停止复制,我将使用memmapped数组,但我的访问模式不是很规律,所以我希望memmapped数组更慢.以上似乎是第一个尝试的事情.这是在Linux上.我只是想从Stackoverflow获得一些建议,并且不想惹恼系统管理员.你认为它会帮助,如果第二个参数是像一个真正的不可变对象glbl_array.tostring().
我更精确地使用Python多处理
from multiprocessing import Pool
p = Pool(15)
args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple
res = p.map_async(func, args) #func is some arbitrary function
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
这种方法具有巨大的内存消耗; 几乎占用了我所有的RAM(此时它变得非常慢,因此使多处理非常无用).我假设问题是这df是一个巨大的对象(一个大型的pandas数据帧),它会被复制到每个进程.我试过使用multiprocessing.Value共享数据帧而不复制
shared_df = multiprocessing.Value(pandas.DataFrame, df)
args = [(shared_df, config1), (shared_df, config2), ...]
Run Code Online (Sandbox Code Playgroud)
(正如Python多处理共享内存中所建议的那样),但是这给了我TypeError: this type has no size(与在Python进程之间共享一个复杂对象相同?,遗憾的是我不理解答案).
我第一次使用多处理,也许我的理解还不够好.是multiprocessing.Value实际上即使在这种情况下使用了正确的事情?我已经看到了其他建议(例如队列),但现在有点困惑.有什么选择可以共享内存,在这种情况下哪一个最好?
我IOError: bad message length在向map函数传递大参数时得到了一个.我怎么能避免这个?我设置N=1500或更大时会发生错误.
代码是:
import numpy as np
import multiprocessing
def func(args):
i=args[0]
images=args[1]
print i
return 0
N=1500 #N=1000 works fine
images=[]
for i in np.arange(N):
images.append(np.random.random_integers(1,100,size=(500,500)))
iter_args=[]
for i in range(0,1):
iter_args.append([i,images])
pool=multiprocessing.Pool()
print pool
pool.map(func,iter_args)
Run Code Online (Sandbox Code Playgroud)
在文档中multiprocessing有recv_bytes一个引发IOError 的函数.可能是因为这个吗?(https://python.readthedocs.org/en/v2.7.2/library/multiprocessing.html)
编辑
如果我使用imagesnumpy数组而不是列表,我得到一个不同的错误:SystemError: NULL result without error in PyObject_Call.有点不同的代码:
import numpy as np
import multiprocessing
def func(args):
i=args[0]
images=args[1]
print i
return 0
N=1500 …Run Code Online (Sandbox Code Playgroud) 我有一大堆自定义对象,我需要执行独立(可并行化)的任务,包括修改对象参数.我尝试过使用Manager().dict和'sharedmem'ory,但两者都没有用.例如:
import numpy as np
import multiprocessing as mp
import sharedmem as shm
class Tester:
num = 0.0
name = 'none'
def __init__(self,tnum=num, tname=name):
self.num = tnum
self.name = tname
def __str__(self):
return '%f %s' % (self.num, self.name)
def mod(test, nn):
test.num = np.random.randn()
test.name = nn
if __name__ == '__main__':
num = 10
tests = np.empty(num, dtype=object)
for it in range(num):
tests[it] = Tester(tnum=it*1.0)
sh_tests = shm.empty(num, dtype=object)
for it in range(num):
sh_tests[it] = tests[it]
print sh_tests[it]
print '\n' …Run Code Online (Sandbox Code Playgroud) 我正按顺序在3个不同的numpy 2D数组上执行一些大型计算.阵列很大,每个25000x25000.每次计算都需要很长时间,所以我决定在服务器上的3个CPU内核上并行运行其中的3个.我遵循标准的多处理指南并创建了2个进程和一个worker函数.两个计算正在通过这两个进程运行,第三个计算在本地运行而没有单独的进程.我传递巨大的数组作为过程的参数,如:
p1 = Process(target = Worker, args = (queue1, array1, ...)) # Some other params also going
p2 = Process(target = Worker, args = (queue2, array2, ...)) # Some other params also going
Run Code Online (Sandbox Code Playgroud)
Worker函数在队列中附加的列表中发回两个numpy向量(1D数组),如:
queue.put([v1, v2])
Run Code Online (Sandbox Code Playgroud)
我没用 multiprocessing.pool
但令人惊讶的是,我没有得到加速,实际上运行速度慢了3倍.通过大型阵列需要时间吗?我无法弄清楚发生了什么.我应该使用共享内存对象而不是传递数组吗?
如果有人可以提供帮助,我将感激不尽.
谢谢.
对于这个问题,我参考了Python 文档中讨论“将SharedMemory类与NumPy数组一起使用,numpy.ndarray从两个不同的 Python shell访问相同的数组”中的示例。
我想实现的一个主要变化是操纵类对象的数组,而不是我在下面演示的整数值。
import numpy as np
from multiprocessing import shared_memory
# a simplistic class example
class A():
def __init__(self, x):
self.x = x
# numpy array of class objects
a = np.array([A(1), A(2), A(3)])
# create a shared memory instance
shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name='psm_test0')
# numpy array backed by shared memory
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
# copy the original data into shared memory
b[:] = …Run Code Online (Sandbox Code Playgroud) 我正在访问一个非常大的Pandas数据帧作为全局变量.通过joblib并行访问此变量.
例如.
df = db.query("select id, a_lot_of_data from table")
def process(id):
temp_df = df.loc[id]
temp_df.apply(another_function)
Parallel(n_jobs=8)(delayed(process)(id) for id in df['id'].to_list())
Run Code Online (Sandbox Code Playgroud)
以这种方式访问原始df似乎是跨进程复制数据.这是意料之外的,因为原始df在任何子进程中都没有被改变?(或者是吗?)
我有一个在大型图形结构上运行的算法,我想使其成为多线程以获得更好的性能。我看过的方法都不太符合我想要的:我希望图表存在于所有进程都可以读取和写入的共享内存中(使用锁来防止竞争条件)。本质上,我想要像 C 语言中的 OpenMP 一样的东西,其中所有内存都可以由每个线程访问。
我首先查看了线程模块,但 GIL 意味着性能提升微不足道。
我继续尝试多处理模块,正如我在该主题上找到的大多数帖子所建议的那样(例如,如何在多个进程之间共享字典?以及python multiprocessing 中的共享内存对象)。这有两个主要问题。
首先,多重处理似乎不适用于复杂的对象。考虑以下玩具问题:我有一个整数列表,想要将它们全部乘以 10,然后以任意顺序输出所有数字。我可以使用以下代码:
def multiply_list():
manager = Manager()
output = manager.list()
threads = []
for v in range(10):
output.append(v)
print([str(v) for v in output])
def process(inputs, start, end):
while start < end:
inputs[start] *= 10
start += 1
t1 = Process(target=process,
args = (output, 0, 5))
t2 = Process(target=process,
args = (output, 5, 10))
t1.start()
t2.start()
t1.join()
t2.join()
print([str(v) for v in output])
Run Code Online (Sandbox Code Playgroud)
与输出:
['0', '1', …Run Code Online (Sandbox Code Playgroud) 我已经阅读了 dask 文档、博客等,但我仍然不是 100% 清楚如何做到这一点。我的用例:
总之:
可能的陷阱/问题:
最有效的解决方案似乎是,如果我们只能将引用数据加载到内存中一次,则使其可供处理事件的多个其他进程只读
通过在每台计算机中加载参考数据来扩展到多台计算机。将文件名推送到计算机以执行。
知道如何实现这一目标吗?
非常感谢你的帮助
我必须从许多进程中访问一组大型且不可选取的 python 对象。因此,我想确保这些对象没有被完全复制。
根据这篇文章和这篇文章中的评论,对象不会被复制(在 UNIX 系统上),除非它们被更改。然而,引用一个对象将会改变它的引用计数,而引用计数又会被复制。
到目前为止这是正确的吗?由于我担心的是大型对象的大小,因此如果复制这些对象的小部分,我不会有问题。
为了确保我正确理解所有内容并且不会发生意外情况,我实现了一个小测试程序:
from multiprocessing import Pool
def f(arg):
print(l, id(l), object.__repr__(l))
l[arg] = -1
print(l, id(l), object.__repr__(l))
def test(n):
global l
l = list(range(n))
with Pool() as pool:
pool.map(f, range(n))
print(l, id(l), object.__repr__(l))
if __name__ == '__main__':
test(5)
Run Code Online (Sandbox Code Playgroud)
在 的第一行中f,我希望id(l)在所有函数调用中返回相同的数字,因为列表在检查之前没有更改id。
另一方面,在 的第三行中f,id(l)应该在每个方法调用中返回不同的数字,因为列表在第二行中发生了更改。
然而,程序的输出让我感到困惑。
[0, 1, 2, 3, 4] 139778408436488 <list object at 0x7f20b261d308>
[-1, 1, 2, 3, 4] 139778408436488 <list …Run Code Online (Sandbox Code Playgroud)