6 python multiprocessing python-multithreading pandas python-multiprocessing
在一个答案中:共享只读数据是否复制到不同的进程以进行多处理?给出了 numpy 数组共享内存的工作解决方案。
如果使用 pandas DataFrame 会是什么样子?
背景:我希望能够在多重处理期间写入数据帧,并希望能够在多重处理完成后进一步处理它。
如果您不想使用 dask,可以使用共享内存共享 pandas 数据帧,方法是首先将其转换为 numpy 数组,然后在子进程中重建它。
from multiprocessing import shared_memory
def create_shared_block(to_share, dtypes):
# float64 can't be pickled
for col, dtype in to_share.dtypes.items():
if dtype == 'float64':
to_share[col] = pd.to_numeric(to_share[col], downcast='float')
# make the dataframe a numpy array
to_share.reset_index(inplace=True)
# drop the index if named index
to_share = to_share.drop('index', axis=1)
# get the dtypes in the same order as the dataframe columns and make sure the types are correct for numpy
dtypes_sorted = sort_dtypes(to_share, dtypes)
# get the dataframe values in the format expected by numpy
values = [tuple(x) for x in to_share.values.tolist()]
# create a numpy array
to_share = np.array(values, dtype=(dtypes_sorted))
# create a shared memory of the size of the array
shm = shared_memory.SharedMemory(create=True, size=to_share.nbytes)
# now create a NumPy array backed by shared memory
np_array = np.ndarray(to_share.shape, dtype=dtypes_sorted, buffer=shm.buf)
# Copy the original data into shared memory
np_array[:] = to_share[:]
return shm, np_array, dtypes_sorted
def sort_dtypes(df, dtypes):
# category is a pandas dtype, not numpy
string_types = ('category', 'object', '|S')
dtypes = [(x, '|S{}'.format(df[x].str.len().max())) if y in string_types else (x, y) for x, y in dtypes if
x in df.columns]
# build a lookup
dtypes_dict = {x: y for x, y in dtypes}
# fix the order
dtypes_sorted = [(x, dtypes_dict[x]) for x in df.columns]
return dtypes_sorted
# ------PARENT PROCESS-------#
# create your shared memory
to_share = pd.DataFrame([['obstacle','obstacle',2,3],['obstacles','obstacle',2,np.nan]],columns=['w1','w2','d1','d2'])
dtypes = [('w1','str'),('w2','|S'),('d1','f'),('d2','f')]
shm, arr, dtypes_sorted = create_shared_block(to_share, dtypes)
# then pass these values to your child processes
shared = (shm.name, arr.shape, dtypes_sorted)
# ------CHILD PROCESS-------#
# assuming you have passed to the child process in a variable called shared, you can reconstruct the dataframe as follows
shared_memory = shared_memory.SharedMemory(name=shared[0])
np_array = np.ndarray(shared[1], dtype=shared[2], buffer=shared_memory.buf)
columns = [x for x, y in shared[2]]
df = pd.DataFrame(np_array, columns=columns)
Run Code Online (Sandbox Code Playgroud)
当共享 100k 行数据帧时,这在我的应用程序中节省了一些内存,但可能不如我使用 dask 等已建立的库节省的内存那么多。我不太确定重新创建 pandas 数据帧所涉及的开销 - 我想它只是引用共享的 numpy 数组并在顶部添加一些额外的东西以使其成为 df。