Bas*_*sen 11 python numpy shared-memory multiprocessing
我已经阅读了很多有关共享数组的问题,对于简单的数组来说这似乎很简单,但我仍然试图让它适用于我拥有的数组.
import numpy as np
data=np.zeros(250,dtype='float32, (250000,2)float32')
Run Code Online (Sandbox Code Playgroud)
我尝试以某种方式mp.Array接受它data,尝试将其转换为共享数组,我也尝试使用ctypes创建数组:
import multiprocessing as mp
data=mp.Array('c_float, (250000)c_float',250)
Run Code Online (Sandbox Code Playgroud)
我设法使代码工作的唯一方法是不将数据传递给函数,而是将编码的字符串传递给未压缩/解码,但最终会调用n(字符串数)进程,这似乎是多余的.我希望的实现是基于将二进制字符串列表切换为x(进程数)并传递此块,data以及a index除了data在本地修改之外的有效进程,因此关于如何使其共享的问题,任何示例工作使用自定义(嵌套)numpy数组已经是一个很好的帮助.
PS:这个问题是Python多处理的后续问题
unu*_*tbu 10
请注意,您可以从一个复杂的dtype数组开始:
In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')
Run Code Online (Sandbox Code Playgroud)
并将其视为同源dtype数组:
In [5]: data2 = data.view('float32')
Run Code Online (Sandbox Code Playgroud)
然后,将其转换回复杂的dtype:
In [7]: data3 = data2.view('float32, (250000,2)float32')
Run Code Online (Sandbox Code Playgroud)
改变dtype是一种非常快速的操作; 它不会影响基础数据,只会影响NumPy解释它的方式.所以改变dtype几乎是无成本的.
因此,您所读到的关于具有简单(同质)dtypes的数组的内容可以通过上述技巧轻松应用于复杂的dtype.
下面的代码借用了JF Sebastian的答案中的许多想法.
import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64
def decode(arg):
chunk, counter = arg
print len(chunk), counter
for x in chunk:
peak_counter = 0
data_buff = base64.b64decode(x)
buff_size = len(data_buff) / 4
unpack_format = ">%dL" % buff_size
index = 0
for y in struct.unpack(unpack_format, data_buff):
buff1 = struct.pack("I", y)
buff2 = struct.unpack("f", buff1)[0]
with shared_arr.get_lock():
data = tonumpyarray(shared_arr).view(
[('f0', '<f4'), ('f1', '<f4', (250000, 2))])
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
peak_counter += 1
index += 1
counter += 1
def pool_init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def numpy_array(shared_arr, peaks):
"""Fills the NumPy array 'data' with m/z-intensity values acquired
from b64 decoding and unpacking the binary string read from the
mzXML file, which is stored in the list 'peaks'.
The m/z values are assumed to be ordered without validating this
assumption.
Note: This function uses multi-processing
"""
processors = mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr, ))) as pool:
chunk_size = int(len(peaks) / processors)
map_parameters = []
for i in range(processors):
counter = i * chunk_size
# WARNING: I removed -1 from (i + 1)*chunk_size, since the right
# index is non-inclusive.
chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode, map_parameters)
if __name__ == '__main__':
shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
peaks = ...
numpy_array(shared_arr, peaks)
Run Code Online (Sandbox Code Playgroud)
如果可以保证执行分配的各种进程
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
Run Code Online (Sandbox Code Playgroud)
永远不会竞争改变相同位置的数据,那么我相信你实际上可以放弃使用锁
with shared_arr.get_lock():
Run Code Online (Sandbox Code Playgroud)
但是我不能很好地理解你的代码以确定,所以为了安全起见,我把锁包括在内.
| 归档时间: |
|
| 查看次数: |
3257 次 |
| 最近记录: |