多处理池中的 Boto3 客户端失败,并显示“botocore.exceptions.NoCredentialsError:无法找到凭据”

gho*_*ost 5 multiprocessing python-3.x boto3

我正在使用 boto3 连接到 s3,下载对象并进行一些处理。我正在使用多处理池来执行上述操作。

以下是我正在使用的代码的概要:

session = None

def set_global_session():
    global session
    if not session:
        session = boto3.Session(region_name='us-east-1')

def function_to_be_sent_to_mp_pool():
    s3 = session.client('s3', region_name='us-east-1')
    list_of_b_n_o = list_of_buckets_and_objects
    for bucket, object in list_of_b_n_o:
        content = s3.get_object(Bucket=bucket, Key=key)
        data = json.loads(content['Body'].read().decode('utf-8'))
        write_processed_data_to_a_location()

def main():
    pool = mp.Pool(initializer=set_global_session, processes=40)
    pool.starmap(function_to_be_sent_to_mp_pool, list_of_b_n_o_i)
Run Code Online (Sandbox Code Playgroud)

现在,当 时processes=40,一切都运转良好。什么时候processes = 64,还是不错的。

但是,当我增加到 时processes=128,出现以下错误:

botocore.exceptions.NoCredentialsError: Unable to locate credentials
Run Code Online (Sandbox Code Playgroud)

我们的机器具有访问 S3 所需的 IAM 角色。此外,发生的奇怪的事情是,对于某些进程,它工作正常,而对于其他一些进程,它会抛出凭据错误。为什么会发生这种情况,以及如何解决这个问题?

发生的另一个奇怪的事情是我能够在 2 个单独的终端选项卡中触发两个作业(每个选项卡都有一个单独的 ssh 登录 shell 到机器)。每个作业都会生成 64 个进程,而且效果也很好,这意味着有 128 个进程同时运行。但一个登录 shell 中的 80 个进程会失败。

跟进:

我尝试用一​​种方法为单独的进程创建单独的会话。另一方面,我直接使用创建了 s3-client boto3.client。然而,它们都在 80 个进程中抛出相同的错误。

我还使用以下额外配置创建了单独的客户端:

botocore.exceptions.NoCredentialsError: Unable to locate credentials
Run Code Online (Sandbox Code Playgroud)

这允许我同时使用 80 个进程,但任何超过 80 个进程都会失败并出现相同的错误。

帖子跟进:

有人可以确认他们是否能够boto3在 128 个进程的多重处理中使用吗?

Den*_*iev 4

我怀疑 AWS 最近减少了对元数据请求的限制,因为我突然开始遇到同样的问题。似乎有效的解决方案是在创建池之前查询一次凭据,并让池中的进程显式使用它们,而不是让它们再次查询凭据。

我将 fsspec 与 s3fs 一起使用,这是我的代码:

def get_aws_credentials():
    '''
    Retrieve current AWS credentials.
    '''
    import asyncio, s3fs
    fs = s3fs.S3FileSystem()

    # Try getting credentials
    num_attempts = 5
    for attempt in range(num_attempts):
        credentials = asyncio.run(fs.session.get_credentials())
        if credentials is not None:
            if attempt > 0:
                log.info('received credentials on attempt %s', 1 + attempt)
            return asyncio.run(credentials.get_frozen_credentials())

        time.sleep(15 * (random.random() + 0.5))

    raise RuntimeError('failed to request AWS credentials '
                       'after %d attempts' % num_attempts)


def process_parallel(fn_d, max_processes):
    # [...]
    c = get_aws_credentials()

    # Cache credentials
    import fsspec.config
    prev_s3_cfg = fsspec.config.conf.get('s3', {})
    try:
        fsspec.config.conf['s3'] = dict(prev_s3_cfg,
                                        key=c.access_key,
                                        secret=c.secret_key)

        num_processes = min(len(fn_d), max_processes)

        from concurrent.futures import ProcessPoolExecutor
        with ProcessPoolExecutor(max_workers=num_processes) as pool:
            for data in pool.map(process_file, fn_d, chunksize=10):
                yield data
    finally:
        fsspec.config.conf['s3'] = prev_s3_cfg
Run Code Online (Sandbox Code Playgroud)

原始 boto3 代码看起来本质上是相同的,除了整个 fs.session 和 asyncio.run() 歌曲和舞蹈之外,您将使用 boto3.Session 本身并直接调用其 get_credentials() 和 get_frozen_credentials() 方法。