gho*_*ost 5 multiprocessing python-3.x boto3
我正在使用 boto3 连接到 s3,下载对象并进行一些处理。我正在使用多处理池来执行上述操作。
以下是我正在使用的代码的概要:
session = None
def set_global_session():
global session
if not session:
session = boto3.Session(region_name='us-east-1')
def function_to_be_sent_to_mp_pool():
s3 = session.client('s3', region_name='us-east-1')
list_of_b_n_o = list_of_buckets_and_objects
for bucket, object in list_of_b_n_o:
content = s3.get_object(Bucket=bucket, Key=key)
data = json.loads(content['Body'].read().decode('utf-8'))
write_processed_data_to_a_location()
def main():
pool = mp.Pool(initializer=set_global_session, processes=40)
pool.starmap(function_to_be_sent_to_mp_pool, list_of_b_n_o_i)
Run Code Online (Sandbox Code Playgroud)
现在,当 时processes=40,一切都运转良好。什么时候processes = 64,还是不错的。
但是,当我增加到 时processes=128,出现以下错误:
botocore.exceptions.NoCredentialsError: Unable to locate credentials
Run Code Online (Sandbox Code Playgroud)
我们的机器具有访问 S3 所需的 IAM 角色。此外,发生的奇怪的事情是,对于某些进程,它工作正常,而对于其他一些进程,它会抛出凭据错误。为什么会发生这种情况,以及如何解决这个问题?
发生的另一个奇怪的事情是我能够在 2 个单独的终端选项卡中触发两个作业(每个选项卡都有一个单独的 ssh 登录 shell 到机器)。每个作业都会生成 64 个进程,而且效果也很好,这意味着有 128 个进程同时运行。但一个登录 shell 中的 80 个进程会失败。
跟进:
我尝试用一种方法为单独的进程创建单独的会话。另一方面,我直接使用创建了 s3-client boto3.client。然而,它们都在 80 个进程中抛出相同的错误。
我还使用以下额外配置创建了单独的客户端:
botocore.exceptions.NoCredentialsError: Unable to locate credentials
Run Code Online (Sandbox Code Playgroud)
这允许我同时使用 80 个进程,但任何超过 80 个进程都会失败并出现相同的错误。
帖子跟进:
有人可以确认他们是否能够boto3在 128 个进程的多重处理中使用吗?
我怀疑 AWS 最近减少了对元数据请求的限制,因为我突然开始遇到同样的问题。似乎有效的解决方案是在创建池之前查询一次凭据,并让池中的进程显式使用它们,而不是让它们再次查询凭据。
我将 fsspec 与 s3fs 一起使用,这是我的代码:
def get_aws_credentials():
'''
Retrieve current AWS credentials.
'''
import asyncio, s3fs
fs = s3fs.S3FileSystem()
# Try getting credentials
num_attempts = 5
for attempt in range(num_attempts):
credentials = asyncio.run(fs.session.get_credentials())
if credentials is not None:
if attempt > 0:
log.info('received credentials on attempt %s', 1 + attempt)
return asyncio.run(credentials.get_frozen_credentials())
time.sleep(15 * (random.random() + 0.5))
raise RuntimeError('failed to request AWS credentials '
'after %d attempts' % num_attempts)
def process_parallel(fn_d, max_processes):
# [...]
c = get_aws_credentials()
# Cache credentials
import fsspec.config
prev_s3_cfg = fsspec.config.conf.get('s3', {})
try:
fsspec.config.conf['s3'] = dict(prev_s3_cfg,
key=c.access_key,
secret=c.secret_key)
num_processes = min(len(fn_d), max_processes)
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=num_processes) as pool:
for data in pool.map(process_file, fn_d, chunksize=10):
yield data
finally:
fsspec.config.conf['s3'] = prev_s3_cfg
Run Code Online (Sandbox Code Playgroud)
原始 boto3 代码看起来本质上是相同的,除了整个 fs.session 和 asyncio.run() 歌曲和舞蹈之外,您将使用 boto3.Session 本身并直接调用其 get_credentials() 和 get_frozen_credentials() 方法。