正确设计Multiprocessing.Manager自定义对象

cod*_*000 8 python amazon-s3 multiprocessing python-2.7

我想使用multiprocessing.Manager()对象,这样我就可以异步地将信息从worker发送到管理器,以便将信息发送到服务器.我所拥有的是将PDF写入磁盘的大约10个实例.然后,我想使用多处理包中的manager对象将该数据发送到我的S3存储桶,因为我不想阻止本地内容生成.

所以我想知道我是否创建了一个自定义管理器对象,这是正确的方法吗?提交给经理对象的每个进程是否会排队?或者,如果我呼叫多个上传,经理会丢弃一些电话吗?

以下是我正在考虑的示例代码:

from multiprocessing.managers import BaseManager

class UploadClass(object):
    def upload(self, filePath, params, destUrl):
        # do stuff
        return results

class MyManager(BaseManager):
    pass

MyManager.register('uploads', UploadClass)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    upload = manager.uploads()
    # do this wait for completion or do they perform this async
    print upload.upload(r"< path >", {...}, "some url")
    print upload.upload(r"< path >", {...}, "some url")
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 2

直接回答你的一些问题:

提交给管理器对象的每个进程都会排队吗?

服务器Manager生成一个新线程来处理每个传入请求,因此您的所有请求都将立即开始处理。您可以在以下位置看到这一点multiprocessing/managers.py

def serve_forever(self):
    '''
    Run the server forever
    '''
    current_process()._manager_server = self
    try:
        try:
            while 1:
                try:
                    c = self.listener.accept()
                except (OSError, IOError):
                    continue
                t = threading.Thread(target=self.handle_request, args=(c,))
                t.daemon = True
                t.start()
        except (KeyboardInterrupt, SystemExit):
            pass
    finally:
        self.stop = 999
        self.listener.close()
Run Code Online (Sandbox Code Playgroud)

如果我呼叫多个上传,管理员会挂断部分呼叫吗?

不会,任何呼叫都不会掉线。

# do this wait for completion or do they perform this async
print upload.upload(r"< path >", {...}, "some url")
print upload.upload(r"< path >", {...}, "some url")
Run Code Online (Sandbox Code Playgroud)

两个调用upload.upload都是同步的;UploadClass.upload在完成之前他们不会返回。但是,如果您有多个脚本/线程/进程upload.upload同时调用,则每个唯一的调用都将在服务器进程中其自己的线程内同时发生Manager

还有你最重要的问题:

这是执行此操作的正确方法吗?

如果我正确理解了这个问题,我会说不。如果您只有一个脚本,然后multiprocessing.Process在该脚本内生成 10 个实例来写出 PDF,那么您应该使用另一个multiprocessing.Process脚本来处理上传:

def upload(self, q):
    for payload in iter(q.get, None):  # Keep getting from the queue until a None is found
        filePath, params, destUrl = payload
        # do stuff

def write_pdf(pdf_file_info, q):
   # write a pdf to disk here
   q.put((filepath, params, destUrl))  # Send work to the uploader
   # Move on with whatever comes next.

if __name__ == '__main__':
    pdf_queue = multiprocessing.Queue()

    # Start uploader
    upload_proc = multiprocessing.Process(upload, args=(pdf_queue,))
    upload_proc.start()

    # Start pdf writers
    procs = []
    for pdf in pdfs_to_write: 
         p = multiprocessing.Process(write_pdf, args=(pdf, pdf_queue))
         p.start()
         p.append(procs)

    # Wait for pdf writers and uploader to finish.
    for p in procs:
        p.join()
    pdf_queue.put(None) # Sending None breaks the for loop inside upload
    upload_proc.join()
Run Code Online (Sandbox Code Playgroud)

如果您确实可以接受并发上传,那么根本不需要单独的upload流程 - 只需直接从 pdf 写入流程上传即可。

不过,很难从你的问题中判断这是否正是你正在做的事情。一旦您澄清,我将调整最后一部分以适合您的特定用例。