在子进程已经开始之后访问共享内存

Sha*_*hin 11 python ipc shared-memory multiprocessing

如果数据仅在子进程生成后可用(使用multiprocessing.Process),如何让子进程访问共享内存中的数据?

我知道multiprocessing.sharedctypeype.RawArray,但我无法弄清楚如何让我的子进程访问RawArray在进程已经启动后创建的进程.

数据由父进程生成,并且数据量事先不知道.

如果不是GIL我将使用线程,这将使这个任务更简单.使用非CPython实现不是一种选择.


muliprocessing.sharedctypes的引擎下,看起来共享ctype对象是使用mmaped内存分配的.

所以这个问题实际上归结为:子进程生成后,如果mmap()父进程调用,子进程是否可以访问匿名映射的内存?

这有点像在这个问题中被问到的内容,除了在我的情况下调用者mmap()是父进程而不是子进程.


(解决了)

我创建了自己的版本RawArray,它使用shm_open()引擎盖下.只要identifier(tag)匹配,生成的共享ctypes数组就可以与任何进程共享.

有关详细信息和示例,请参阅此答案.

sar*_*old 6

您的问题听起来非常适合posix_ipcsysv_ipc模块,它们为共享内存,信号量和消息队列提供POSIX或SysV API.其中的特征矩阵包括在他提供的模块中挑选的极好建议.

匿名mmap(2)区域的问题在于您无法轻松地与其他进程共享它们 - 如果它们是文件支持的,那么它很容易,但如果您实际上不需要该文件用于其他任何进程,那就感觉很愚蠢.如果这是在C中,你可以使用该CLONE_VM标志进行clone(2)系统调用,但我不想尝试将它与语言解释器一起使用,这可能会对内存安全性做出假设.(即使在C语言中它也有点危险,因为五年后的维护程序员也会对这种CLONE_VM行为感到震惊.)

但是SysV和更新的POSIX共享内存映射允许甚至不相关的进程通过标识符附加和分离共享内存,因此您需要做的就是从使用映射的进程创建映射的进程共享标识符,然后当您在映射中操作数据时,它们可同时用于所有进程,而无需任何额外的解析开销.该shm_open(3)函数返回一个int在以后的调用中用作文件描述符ftruncate(2)然后mmap(2),因此其他进程可以使用共享内存段而不在文件系统中创建文件 - 即使所有使用它的进程都已退出,该内存也将保持不变.(或许对Unix来说有点奇怪,但它很灵活.)


Sha*_*hin 6

免责声明:我是问题的作者.

我最终使用posix_ipc模块创建了我自己的RawArray版本.我主要使用posix_ipc.SharedMemory哪种电话shm_open().

我的implementation(ShmemRawArray)公开了相同的功能,RawArray但需要两个额外的参数 - 一个tag用于唯一标识共享内存区域,另一个create用于确定是否应创建新的共享内存段或附加到现有内存段的标志.

如果有人感兴趣的话,这是一个副本:https://gist.github.com/1222327

ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)
Run Code Online (Sandbox Code Playgroud)

使用说明:

  • 前两个args(typecode_or_typesize_or_initializer)应该与使用相同RawArray.
  • 只要tag匹配,任何进程都可以访问共享数组.
  • ShmemRawArray(..., create=True)删除原始对象(返回者)时,共享内存段将被取消链接
  • 使用tag当前存在的共享数组创建共享数组将引发一个ExistentialError
  • 使用tag不存在的共享数组(或者已经取消链接的数组)访问共享数组也会引发共享数组ExistentialError

一个SSCCE(短,自包含,可编译的例子)显示它在行动.

#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray

class Point(ctypes.Structure):
    _fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]

def worker(q):
    # get access to ctypes array shared by parent
    count, tag = q.get()
    shared_data = ShmemRawArray(Point, count, tag, False)

    proc_name = multiprocessing.current_process().name
    print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]

if __name__ == '__main__':
    procs = []
    np = multiprocessing.cpu_count()
    queue = multiprocessing.Queue()

    # spawn child processes
    for i in xrange(np):
        p = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(p)
        p.start()

    # create a unique tag for shmem segment
    tag = "stack-overflow-%d" % multiprocessing.current_process().pid

    # random number of points with random data
    count = randint(3,10) 
    combined_data = [Point(x=random(), y=random()) for i in xrange(count)]

    # create ctypes array in shared memory using ShmemRawArray
    # - we won't be able to use multiprocssing.sharectypes.RawArray here 
    #   because children already spawned
    shared_data = ShmemRawArray(Point, combined_data, tag)

    # give children info needed to access ctypes array
    for p in procs:
        queue.put((count, tag))

    print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
    for p in procs:
        p.join()
Run Code Online (Sandbox Code Playgroud)

运行此结果会产生以下输出:

[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Run Code Online (Sandbox Code Playgroud)