相关疑难解决方法(0)

可以在两个独立的进程之间共享内存数据吗?

我有一个使用Twisted的xmlrpc服务器.服务器有大量的数据存储在内存中.是否可以运行一个辅助的单独的xmlrpc服务器,它可以访问第一个服务器中的内存中的对象?

因此,serverA启动并创建一个对象.serverB启动并可以从serverA中的对象读取.

*编辑*

要共享的数据是一百万个元组的列表.

python

59
推荐指数
4
解决办法
5万
查看次数

多处理中的共享内存

我有三个大清单.首先包含bitarrays(模块bitarray 0.8.0),另外两个包含整数数组.

l1=[bitarray 1, bitarray 2, ... ,bitarray n]
l2=[array 1, array 2, ... , array n]
l3=[array 1, array 2, ... , array n]
Run Code Online (Sandbox Code Playgroud)

这些数据结构需要相当多的RAM(总共约16GB).

如果我使用以下方式启动12个子流程:

multiprocessing.Process(target=someFunction, args=(l1,l2,l3))
Run Code Online (Sandbox Code Playgroud)

这是否意味着将为每个子流程复制l1,l2和l3,或者子流程是否会共享这些列表?或者更直接,我会使用16GB或192GB的RAM吗?

someFunction将从这些列表中读取一些值,然后根据读取的值执行一些计算.结果将返回到父进程.someIunction不会修改列表l1,l2和l3.

因此,我认为子流程不需要也不会复制这些巨大的列表,而只是与父级共享它们.这意味着由于linux下的写时复制方法,该程序将占用16GB的RAM(无论我启动多少个子进程)?我是正确的还是我错过了会导致列表被复制的内容?

编辑:在阅读了关于这个主题的更多内容后,我仍然感到困惑.一方面,Linux使用copy-on-write,这意味着不会复制任何数据.另一方面,访问该对象将改变其重新计数(我仍然不确定为什么以及这意味着什么).即便如此,是否会复制整个对象?

例如,如果我定义someFunction如下:

def someFunction(list1, list2, list3):
    i=random.randint(0,99999)
    print list1[i], list2[i], list3[i]
Run Code Online (Sandbox Code Playgroud)

是否使用此函数意味着将为每个子流程完全复制l1,l2和l3?

有没有办法检查这个?

EDIT2在子流程运行的同时读取更多内容并监视系统的总内存使用情况后,似乎确实为每个子流程复制了整个对象.它似乎是因为引用计数.

在我的程序中实际上不需要l1,l2和l3的引用计数.这是因为l1,l2和l3将保留在内存中(未更改),直到父进程退出.在此之前,不需要释放这些列表使用的内存.事实上,我确信引用计数将保持在0以上(对于这些列表和这些列表中的每个对象),直到程序退出.

所以现在问题变成了,我怎样才能确保不会将对象复制到每个子进程?我可以禁用这些列表和这些列表中的每个对象的引用计数吗?

EDIT3只是一个额外的说明.子进程并不需要修改l1,l2l3或在这些列表中的任何对象.子进程只需要能够引用其中一些对象,而不会导致为每个子进程复制内存.

python shared-memory multiprocessing large-data

56
推荐指数
4
解决办法
8万
查看次数

是否将共享只读数据复制到不同进程以进行多处理?

我看到的那段代码看起来像这样:

glbl_array = # a 3 Gb array

def my_func( args, def_param = glbl_array):
    #do stuff on args and def_param

if __name__ == '__main__':
  pool = Pool(processes=4)
  pool.map(my_func, range(1000))
Run Code Online (Sandbox Code Playgroud)

有没有办法确保(或鼓励)不同的进程没有获得glbl_array的副本但共享它.如果没有办法停止复制,我将使用memmapped数组,但我的访问模式不是很规律,所以我希望memmapped数组更慢.以上似乎是第一个尝试的事情.这是在Linux上.我只是想从Stackoverflow获得一些建议,并且不想惹恼系统管理员.你认为它会帮助,如果第二个参数是像一个真正的不可变对象glbl_array.tostring().

python numpy multiprocessing

52
推荐指数
3
解决办法
4万
查看次数

python中的多处理 - 在多个进程之间共享大对象(例如pandas dataframe)

我更精确地使用Python多处理

from multiprocessing import Pool
p = Pool(15)

args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple
res = p.map_async(func, args) #func is some arbitrary function
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

这种方法具有巨大的内存消耗; 几乎占用了我所有的RAM(此时它变得非常慢,因此使多处理非常无用).我假设问题是这df是一个巨大的对象(一个大型的pandas数据帧),它会被复制到每个进程.我试过使用multiprocessing.Value共享数据帧而不复制

shared_df = multiprocessing.Value(pandas.DataFrame, df)
args = [(shared_df, config1), (shared_df, config2), ...] 
Run Code Online (Sandbox Code Playgroud)

(正如Python多处理共享内存中所建议的那样),但是这给了我TypeError: this type has no size(与在Python进程之间共享一个复杂对象相同,遗憾的是我不理解答案).

我第一次使用多处理,也许我的理解还不够好.是multiprocessing.Value实际上即使在这种情况下使用了正确的事情?我已经看到了其他建议(例如队列),但现在有点困惑.有什么选择可以共享内存,在这种情况下哪一个最好?

python multiprocessing pandas

36
推荐指数
4
解决办法
2万
查看次数

多处理IOError:错误的消息长度

IOError: bad message length在向map函数传递大参数时得到了一个.我怎么能避免这个?我设置N=1500或更大时会发生错误.

代码是:

import numpy as np
import multiprocessing

def func(args):
    i=args[0]
    images=args[1]
    print i
    return 0

N=1500       #N=1000 works fine

images=[]
for i in np.arange(N):
    images.append(np.random.random_integers(1,100,size=(500,500)))

iter_args=[]
for i in range(0,1):
    iter_args.append([i,images])

pool=multiprocessing.Pool()
print pool
pool.map(func,iter_args)
Run Code Online (Sandbox Code Playgroud)

在文档中multiprocessingrecv_bytes一个引发IOError 的函数.可能是因为这个吗?(https://python.readthedocs.org/en/v2.7.2/library/multiprocessing.html)

编辑 如果我使用imagesnumpy数组而不是列表,我得到一个不同的错误:SystemError: NULL result without error in PyObject_Call.有点不同的代码:

import numpy as np
import multiprocessing

def func(args):
    i=args[0]
    images=args[1]
    print i
    return 0

N=1500 …
Run Code Online (Sandbox Code Playgroud)

python numpy pool multiprocessing ioerror

21
推荐指数
2
解决办法
5441
查看次数

在python多处理中修改对象

我有一大堆自定义对象,我需要执行独立(可并行化)的任务,包括修改对象参数.我尝试过使用Manager().dict和'sharedmem'ory,但两者都没有用.例如:

import numpy as np
import multiprocessing as mp
import sharedmem as shm


class Tester:

    num = 0.0
    name = 'none'
    def __init__(self,tnum=num, tname=name):
        self.num  = tnum
        self.name = tname

    def __str__(self):
        return '%f %s' % (self.num, self.name)

def mod(test, nn):
    test.num = np.random.randn()
    test.name = nn


if __name__ == '__main__':

    num = 10

    tests = np.empty(num, dtype=object)
    for it in range(num):
        tests[it] = Tester(tnum=it*1.0)

    sh_tests = shm.empty(num, dtype=object)
    for it in range(num):
        sh_tests[it] = tests[it]
        print sh_tests[it]

    print '\n' …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing multiprocessing

14
推荐指数
2
解决办法
9954
查看次数

Python多处理比单个处理花费更长的时间

我正按顺序在3个不同的numpy 2D数组上执行一些大型计算.阵列很大,每个25000x25000.每次计算都需要很长时间,所以我决定在服务器上的3个CPU内核上并行运行其中的3个.我遵循标准的多处理指南并创建了2个进程和一个worker函数.两个计算正在通过这两个进程运行,第三个计算在本地运行而没有单独的进程.我传递巨大的数组作为过程的参数,如:

p1 = Process(target = Worker, args = (queue1, array1, ...)) # Some other params also going

p2 = Process(target = Worker, args = (queue2, array2, ...)) # Some other params also going
Run Code Online (Sandbox Code Playgroud)

Worker函数在队列中附加的列表中发回两个numpy向量(1D数组),如:

queue.put([v1, v2])
Run Code Online (Sandbox Code Playgroud)

我没用 multiprocessing.pool

但令人惊讶的是,我没有得到加速,实际上运行速度慢了3倍.通过大型阵列需要时间吗?我无法弄清楚发生了什么.我应该使用共享内存对象而不是传递数组吗?

如果有人可以提供帮助,我将感激不尽.

谢谢.

python arrays numpy process multiprocessing

10
推荐指数
1
解决办法
3649
查看次数

使用 Python 多处理共享对象数组

对于这个问题,我参考了Python 文档中讨论“将SharedMemory类与NumPy数组一起使用,numpy.ndarray从两个不同的 Python shell访问相同的数组”中的示例

我想实现的一个主要变化是操纵类对象的数组,而不是我在下面演示的整数值。

import numpy as np
from multiprocessing import shared_memory    

# a simplistic class example
class A(): 
    def __init__(self, x): 
        self.x = x

# numpy array of class objects 
a = np.array([A(1), A(2), A(3)])       

# create a shared memory instance
shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name='psm_test0')

# numpy array backed by shared memory
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)                                    

# copy the original data into shared memory
b[:] = …
Run Code Online (Sandbox Code Playgroud)

python shared-memory multiprocessing python-3.8

9
推荐指数
1
解决办法
993
查看次数

大熊猫Dataframe并行处理

我正在访问一个非常大的Pandas数据帧作为全局变量.通过joblib并行访问此变量.

例如.

df = db.query("select id, a_lot_of_data from table")

def process(id):
    temp_df = df.loc[id]
    temp_df.apply(another_function)

Parallel(n_jobs=8)(delayed(process)(id) for id in df['id'].to_list())
Run Code Online (Sandbox Code Playgroud)

以这种方式访问​​原始df似乎是跨进程复制数据.这是意料之外的,因为原始df在任何子进程中都没有被改变?(或者是吗?)

python pandas joblib

8
推荐指数
1
解决办法
1万
查看次数

共享内存复杂可写数据结构

我有一个在大型图形结构上运行的算法,我想使其成为多线程以获得更好的性能。我看过的方法都不太符合我想要的:我希望图表存在于所有进程都可以读取和写入的共享内存中(使用锁来防止竞争条件)。本质上,我想要像 C 语言中的 OpenMP 一样的东西,其中所有内存都可以由每个线程访问。

我首先查看了线程模块,但 GIL 意味着性能提升微不足道。

我继续尝试多处理模块,正如我在该主题上找到的大多数帖子所建议的那样(例如,如何在多个进程之间共享字典?以及python multiprocessing 中的共享内存对象)。这有两个主要问题。

首先,多重处理似乎不适用于复杂的对象。考虑以下玩具问题:我有一个整数列表,想要将它们全部乘以 10,然后以任意顺序输出所有数字。我可以使用以下代码:

def multiply_list():
    manager = Manager()
    output = manager.list()
    threads = []

    for v in range(10):
        output.append(v)
    print([str(v) for v in output])

    def process(inputs, start, end):
        while start < end:
            inputs[start] *= 10
            start += 1

    t1 = Process(target=process,
        args = (output, 0, 5))
    t2 = Process(target=process,
        args = (output, 5, 10))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    print([str(v) for v in output])
Run Code Online (Sandbox Code Playgroud)

与输出:

['0', '1', …
Run Code Online (Sandbox Code Playgroud)

python multithreading

5
推荐指数
1
解决办法
1720
查看次数

dask:并行模型中的共享内存

我已经阅读了 dask 文档、博客等,但我仍然不是 100% 清楚如何做到这一点。我的用例:

  • 我有大约 10GB 的参考数据。一旦加载,它们就是只读的。通常我们将它们加载到 Dask/Pandas 数据框中
  • 我需要这些参考数据来处理(丰富、修改、转换)每天大约 500 个 mio 事件(多个文件)
  • “流程”是大约 40 个任务的管道。执行顺序是相关的(依赖性)。
  • 每个单独的任务并不复杂或耗时,主要是查找、丰富、映射等。
  • 事件之间不存在依赖性。理论上,我可以通过单独的线程处理每个事件,将输出合并到一个文件中,然后就完成了。输出事件甚至不需要与输入事件具有相同的顺序。

总之:

  • 我们可以大规模并行化事件处理
  • 每个并行线程都需要相同的 10 GB(原始)引用数据
  • 处理单个事件意味着将 40 个任务的序列/管道应用于它们
  • 每个单独的任务并不耗时(读取参考数据并修改事件)

可能的陷阱/问题:

  • 花费更多的时间在序列化/反序列化上,而不是处理数据(我们在一些使用类似管道的方法的试验中确实经历过这种情况)
  • 引用数据被多次加载,每个(并行)进程加载一次
  • 我最好想在我的笔记本电脑上开发/测试它,但我没有足够的内存来加载参考数据。可能是解决方案是否会利用内存映射?

最有效的解决方案似乎是,如果我们只能将引用数据加载到内存中一次,则使其可供处理事件的多个其他进程只读

通过在每台计算机中加载参考数据来扩展到多台计算机。将文件名推送到计算机以执行。

知道如何实现这一目标吗?

非常感谢你的帮助

python pandas joblib dask

5
推荐指数
1
解决办法
2767
查看次数

检查 Linux 系统上 python 多处理中的 fork 行为

我必须从许多进程中访问一组大型且不可选取的 python 对象。因此,我想确保这些对象没有被完全复制。

根据这篇文章和这篇文章中的评论,对象不会被复制(在 UNIX 系统上),除非它们被更改。然而,引用一个对象将会改变它的引用计数,而引用计数又会被复制。

到目前为止这是正确的吗?由于我担心的是大型对象的大小,因此如果复制这些对象的小部分,我不会有问题。

为了确保我正确理解所有内容并且不会发生意外情况,我实现了一个小测试程序:

from multiprocessing import Pool

def f(arg):
    print(l, id(l), object.__repr__(l))
    l[arg] = -1
    print(l, id(l), object.__repr__(l))

def test(n):
    global l
    l = list(range(n))
    with Pool() as pool: 
        pool.map(f, range(n))
    print(l, id(l), object.__repr__(l))

if __name__ == '__main__':
    test(5) 
Run Code Online (Sandbox Code Playgroud)

在 的第一行中f,我希望id(l)在所有函数调用中返回相同的数字,因为列表在检查之前没有更改id

另一方面,在 的第三行中fid(l)应该在每个方法调用中返回不同的数字,因为列表在第二行中发生了更改。

然而,程序的输出让我感到困惑。

[0, 1, 2, 3, 4] 139778408436488 <list object at 0x7f20b261d308>
[-1, 1, 2, 3, 4] 139778408436488 <list …
Run Code Online (Sandbox Code Playgroud)

python fork shared-memory multiprocessing

3
推荐指数
1
解决办法
1685
查看次数