如何在 Python 多处理中使用 boto3 客户端?

RNH*_*TTR 10 python python-multiprocessing boto3 starmap

代码如下所示:

import multiprocessing as mp
from functools import partial

import boto3
import numpy as np


s3 = boto3.client('s3')

def _something(**kwargs):
    # Some mixed integer programming stuff related to the variable archive
    return np.array(some_variable_related_to_archive)


def do(s3):
    archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant
    pool = mp.pool()
    sub_process = partial(_something, slack=0.1)
    parts = np.array_split(archive, some_int)
    target_parts = np.array(things)

    out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line

    pool.close()
    pool.join()

do(s3)
Run Code Online (Sandbox Code Playgroud)

错误:

_pickle.PicklingError: Can't pickle <class 'botocore.client.S3'>: attribute lookup S3 on botocore.client failed
Run Code Online (Sandbox Code Playgroud)

我对 Python 多处理库的经验非常有限。我不知道为什么,当 S3 客户端不是任何函数中的参数时,它会抛出上述错误。请注意,如果存档文件是从磁盘加载的,而不是从 S3 加载,则代码能够正常运行。

任何帮助/指导将不胜感激。

RNH*_*TTR 6

传递给 mp.starmap() 的对象必须是可pickle 的,而S3 客户端则不可pickle。将 S3 客户端的操作放在调用 mp.starmap() 的函数之外可以解决这个问题:

import multiprocessing as mp
from functools import partial

import boto3
import numpy as np


s3 = boto3.client('s3')
archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant # Move the s3 call here, outside of the do() function

def _something(**kwargs):
    # Some mixed integer programming stuff related to the variable archive
    return np.array(some_variable_related_to_archive)


def do(archive): # pass the previously loaded archive, and not the s3 object into the function
    pool = mp.pool()
    sub_process = partial(_something, slack=0.1)
    parts = np.array_split(archive, some_int)
    target_parts = np.array(things)

    out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line

    pool.close()
    pool.join()

do(archive) # pass the previously loaded archive, and not the s3 object into the function
Run Code Online (Sandbox Code Playgroud)