假设我有一个大内存numpy数组,我有一个函数func,它接受这个巨大的数组作为输入(连同一些其他参数).func具有不同参数可以并行运行.例如:
def func(arr, param):
# do stuff to arr, param
# build array arr
pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
Run Code Online (Sandbox Code Playgroud)
如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中.
有没有办法让不同的进程共享同一个数组?此数组对象是只读的,永远不会被修改.
更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法分享它?
[EDITED]
我读了答案,但我仍然有点困惑.由于fork()是copy-on-write,因此在python多处理库中生成新进程时不应调用任何额外的成本.但是下面的代码表明存在巨大的开销:
from multiprocessing import Pool, Manager
import numpy as np;
import time
def f(arr):
return len(arr)
t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;
pool = Pool(processes = 6)
t = …Run Code Online (Sandbox Code Playgroud) python parallel-processing numpy shared-memory multiprocessing
我有一个脚本,通过imap_unordered()调用成功地执行多处理池任务集:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Run Code Online (Sandbox Code Playgroud)
但是,我num_tasks大概是250,000,所以join()锁定主线程10秒左右,我希望能够逐步回显到命令行,以显示主进程未被锁定.就像是:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
Run Code Online (Sandbox Code Playgroud)
是否有结果对象或池本身的方法指示剩余的任务数量?我尝试使用一个multiprocessing.Value对象作为计数器(在完成任务后do_work调用一个counter.value += 1 …
我有三个大清单.首先包含bitarrays(模块bitarray 0.8.0),另外两个包含整数数组.
l1=[bitarray 1, bitarray 2, ... ,bitarray n]
l2=[array 1, array 2, ... , array n]
l3=[array 1, array 2, ... , array n]
Run Code Online (Sandbox Code Playgroud)
这些数据结构需要相当多的RAM(总共约16GB).
如果我使用以下方式启动12个子流程:
multiprocessing.Process(target=someFunction, args=(l1,l2,l3))
Run Code Online (Sandbox Code Playgroud)
这是否意味着将为每个子流程复制l1,l2和l3,或者子流程是否会共享这些列表?或者更直接,我会使用16GB或192GB的RAM吗?
someFunction将从这些列表中读取一些值,然后根据读取的值执行一些计算.结果将返回到父进程.someIunction不会修改列表l1,l2和l3.
因此,我认为子流程不需要也不会复制这些巨大的列表,而只是与父级共享它们.这意味着由于linux下的写时复制方法,该程序将占用16GB的RAM(无论我启动多少个子进程)?我是正确的还是我错过了会导致列表被复制的内容?
编辑:在阅读了关于这个主题的更多内容后,我仍然感到困惑.一方面,Linux使用copy-on-write,这意味着不会复制任何数据.另一方面,访问该对象将改变其重新计数(我仍然不确定为什么以及这意味着什么).即便如此,是否会复制整个对象?
例如,如果我定义someFunction如下:
def someFunction(list1, list2, list3):
i=random.randint(0,99999)
print list1[i], list2[i], list3[i]
Run Code Online (Sandbox Code Playgroud)
是否使用此函数意味着将为每个子流程完全复制l1,l2和l3?
有没有办法检查这个?
EDIT2在子流程运行的同时读取更多内容并监视系统的总内存使用情况后,似乎确实为每个子流程复制了整个对象.它似乎是因为引用计数.
在我的程序中实际上不需要l1,l2和l3的引用计数.这是因为l1,l2和l3将保留在内存中(未更改),直到父进程退出.在此之前,不需要释放这些列表使用的内存.事实上,我确信引用计数将保持在0以上(对于这些列表和这些列表中的每个对象),直到程序退出.
所以现在问题变成了,我怎样才能确保不会将对象复制到每个子进程?我可以禁用这些列表和这些列表中的每个对象的引用计数吗?
EDIT3只是一个额外的说明.子进程并不需要修改l1,l2并l3或在这些列表中的任何对象.子进程只需要能够引用其中一些对象,而不会导致为每个子进程复制内存.
我有一个相当复杂的Python对象,我需要在多个进程之间共享.我使用启动这些过程multiprocessing.Process.当我分享一个对象multiprocessing.Queue,并multiprocessing.Pipe在其中,他们共享就好了.但是当我尝试与其他非多处理模块对象共享一个对象时,似乎Python会分叉这些对象.真的吗?
我尝试使用multiprocessing.Value.但我不确定应该是什么类型的?我的对象类叫做MyClass.但是当我尝试时multiprocess.Value(MyClass, instance),它失败了:
TypeError: this type has no size
知道发生了什么事吗?
我正在访问一个非常大的Pandas数据帧作为全局变量.通过joblib并行访问此变量.
例如.
df = db.query("select id, a_lot_of_data from table")
def process(id):
temp_df = df.loc[id]
temp_df.apply(another_function)
Parallel(n_jobs=8)(delayed(process)(id) for id in df['id'].to_list())
Run Code Online (Sandbox Code Playgroud)
以这种方式访问原始df似乎是跨进程复制数据.这是意料之外的,因为原始df在任何子进程中都没有被改变?(或者是吗?)
我有一个python pandas数据帧的字典.这本词典的总大小约为2GB.但是,当我在16个多处理中共享它时(在子进程中我只读取dict的数据而不修改它),它需要32GB ram.所以我想问一下,如果我可以在不进行复制的情况下跨多处理共享这个字典.我试图将它转换为manager.dict().但似乎需要太长时间.实现这一目标的最标准方法是什么?谢谢.
因此,假设您有一个 Python 进程,它正在以每秒大约 500 行的速度实时收集来自排队系统的数据(这可以进一步并行化以减少到大约 50 ps)并将其附加到DataFrame:
rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
recv = rq.get(block=True)
# some converting
df.append(recv, ignore_index = True)
Run Code Online (Sandbox Code Playgroud)
现在的问题是:如何根据这些数据利用 CPU?所以我充分认识到的局限性GIL,看着成多处理管理器 的命名空间,这里太,但它看起来像有一些缺点关于延迟的centerally保持数据帧。在深入研究之前,我还尝试pool.map了我认为pickle在进程之间应用的方法,这种方法很慢并且开销太大。
所以在所有这一切之后,我终于想知道,如何(如果)每秒 500 行(甚至每秒 50 行)的插入可以传输到不同的进程,同时还有一些 CPU 时间用于对子项中的数据应用统计和启发式流程?
也许在两个进程之间实现自定义 tcp 套接字或排队系统会更好?或者在pandas其他库中是否有一些实现可以真正允许快速访问父进程中的一个大数据框?我爱熊猫!
这里我创建了一个生产者-客户程序,父进程(生产者)创建了许多子进程(消费者)?然后父进程读取文件并将数据传递给子进程。
但是,这里出现了一个性能问题,在进程之间传递消息花费了太多时间?我认为?。
例如,一个200MB 的原始数据,父进程读取和预处理将花费不到8秒,而不是仅通过多进程将数据传递给子进程。管道将花费另外8秒,子进程完成剩余的工作只花费另外3 到 4秒。
所以,一个完整的工作流程花费不到 18 秒,进程之间通信的时间成本超过 40%,这比我想象的要大得多,我尝试了多进程。队列和经理,他们更糟。
我使用 windows7/Python3.4。我用谷歌搜索了几天,POSH 可能是一个很好的解决方案,但它不能用 python3.4 构建
我有 3 种方法:
1.有没有什么办法可以在Python3.4的进程之间直接共享python对象?作为 POSH
或者
2.是否可以将对象的“指针”传递给子进程并且子进程可以恢复指向python对象的“指针”?
或者
3.multiprocess.Array 可能是一个有效的解决方案,但如果我想共享复杂的数据结构,例如列表,它是如何工作的?我应该在它的基础上创建一个新类并提供接口作为列表吗?
Edit1:
我尝试了第三种方式,但效果更糟。
我定义了这些值:
p_pos = multiprocessing.Value('i') #producer write position
c_pos = multiprocessing.Value('i') #customer read position
databuff = multiprocess.Array('c',buff_len) # shared buffer
Run Code Online (Sandbox Code Playgroud)
和两个功能:
send_data(msg)
get_data()
Run Code Online (Sandbox Code Playgroud)
在send_data函数(父进程)中,它将 msg 复制到 databuff ,并通过管道将开始和结束位置(两个整数)发送到子进程。
比在get_data函数(子进程)中,它接收两个位置并从 databuff 复制 msg。
最后,它比使用管道 @_@ …
我一直在看以下问题,但没有任何运气:
python中的多处理-在多个进程之间共享大对象(例如pandas数据帧)
我编写了一个非常基本的测试文件来说明我正在尝试做的事情:
from collections import deque
from multiprocessing import Process
import numpy as np
class TestClass:
def __init__(self):
self.mem = deque(maxlen=4)
self.process = Process(target=self.run)
def run(self):
while True:
self.mem.append(np.array([0, 1, 2, 3, 4]))
def print_values(x):
while True:
print(x)
test = TestClass()
process = Process(target=print_values(test.mem))
test.process.start()
process.start()
Run Code Online (Sandbox Code Playgroud)
目前这输出以下内容:
deque([], maxlen=4)
Run Code Online (Sandbox Code Playgroud)
如何从主代码或运行“print_values”的进程访问内存值?
我有一个数据框,我执行一些操作并打印出来.要做到这一点,我必须遍历每一行.
for count, row in final_df.iterrows():
x = row['param_a']
y = row['param_b']
# Perform operation
# Write to output file
Run Code Online (Sandbox Code Playgroud)
我决定使用python多处理模块并行化这个
def write_site_files(row):
x = row['param_a']
y = row['param_b']
# Perform operation
# Write to output file
pkg_num = 0
total_runs = final_df.shape[0] # Total number of rows in final_df
threads = []
import multiprocessing
while pkg_num < total_runs or len(threads):
if(len(threads) < num_proc and pkg_num < total_runs):
print pkg_num, total_runs
t = multiprocessing.Process(target=write_site_files,args=[final_df.iloc[pkg_num],pkg_num])
pkg_num = pkg_num + 1
t.start() …Run Code Online (Sandbox Code Playgroud) 我有几个进程,每个进程都需要一个大的 numpy 数组来完成任务,这只是被读取(线程正在搜索它以寻找适当的值)。
如果每个进程加载数据,我会收到内存错误。
因此,我试图通过使用管理器在进程之间共享相同的数组来最小化内存使用量。
但是我仍然收到内存错误。我可以在主进程中加载一次数组,但是当我尝试将其作为管理器命名空间的属性时,我收到了内存错误。我假设管理器就像指针一样,并允许单独的进程(通常只能访问自己的内存)也可以访问这个共享内存。但是错误提到了酸洗:
Traceback (most recent call last):
File <PATH>, line 63, in <module>
ns.pp = something
File "C:\Program Files (x86)\Python35-32\lib\multiprocessing\managers.py", line 1021, in __setattr__
return callmethod('__setattr__', (key, value))
File "C:\Program Files (x86)\Python35-32\lib\multiprocessing\managers.py", line 716, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "C:\Program Files (x86)\Python35-32\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "C:\Program Files (x86)\Python35-32\lib\multiprocessing\reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
MemoryError
Run Code Online (Sandbox Code Playgroud)
我假设 numpy 数组在分配给经理时实际上正在被复制,但我可能错了。
更令人恼火的是,我在一台具有 32GB 内存的机器上,看着内存使用情况,它只会在崩溃前稍微增加一点,最多可能增加 …
python ×9
pandas ×4
numpy ×2
python-3.x ×2
dataframe ×1
dictionary ×1
joblib ×1
large-data ×1
loops ×1
memory ×1
performance ×1
process ×1
sharing ×1