Fal*_*aea 8 python asynchronous threadpoolexecutor boto3 aws-lambda
一些背景:
我正在开发一个测试套件,用于使用 CodePipeline 和两个 lambda 函数更新和测试全局重定向:一个包装器函数和一个测试器函数。
包装器循环遍历域列表。对于每个域,它获取关联的 S3 文件,然后创建一长串对象(文件中每行一个对象)。示例对象:
redirect = {
'staging': None,
'prod': None,
'country': 'US',
'language': 'EN',
'status_code': None,
'shortlink': None,
'expected': None
}
Run Code Online (Sandbox Code Playgroud)
然后,包装器获取此对象列表并调用第二个 lambda(测试器 lambda),该 lambda 为每个对象发送一系列 httpx 请求等。
其中最重要的一点是它是异步的 - 否则,lambda 将在 15 分钟时超时。
实际问题:
包装函数只能触发测试器 10 次。否则,当返回结果时,我会收到以下错误:
Connection pool is full, discarding connection: lambda.us-east-1.amazonaws.com
Run Code Online (Sandbox Code Playgroud)
我提高了 ThreadPoolExecutor 的 max_workers,甚至为 boto3 设置了 max_workers。我与 AWS 支持人员进行了交谈,我的实际 lambda 设置有利于我的测试器 lambda 根据需要多次触发。然而,我仍然收到连接池错误。
相关代码:
from botocore.client import Config
from boto3.session import Session
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
max_pool_connections = 30
config = Config(
max_pool_connections=max_pool_connections,
read_timeout = 900
)
def handler(event, context):
try:
events = ... # generated via a bunch of nonsense
with ThreadPoolExecutor(max_pool_connections) as executor:
futures = []
for event in events:
print(event)
future = executor.submit(lambda_client.invoke, FunctionName = "site-tester", InvocationType = "RequestResponse", Payload = json.dumps(event))
futures.append(future)
for index, future in enumerate(as_completed(futures, timeout=None), start=1):
...
Run Code Online (Sandbox Code Playgroud)
这(显然)不是完整的代码,因为完整的代码绝对是一堆废话。我目前没有时间创建完整的测试功能,但如果有人有任何初步想法或故障排除提示,我们将不胜感激。
我确实想指出,对于确实被调用并返回的 10 个事件,一切都按预期进行。仅限 10 次,但我确实需要更多。
同样,其中最重要的部分是它是异步的 - 否则,lambda 将在 15 分钟标记处超时。
并发设置
包装函数:保留并发设置为 30
测试器功能:保留并发设置为 30
Svi*_*huk 11
正如 JustinTArthur 在Github上提到的:
这个警告没问题。如果您深入研究 urllib3 连接池代码,它基本上是持久连接池,而不是您可以拥有的最大并发连接数。如果端点尚未关闭池中的连接,则会重新使用它们。
如果您想增加此池的大小,可以使用低级 botocore 配置在每个端点上完成。boto3 示例:
import boto3
import botocore
client_config = botocore.config.Config(
max_pool_connections=25,
)
boto3.client('lambda', config=client_config)
Run Code Online (Sandbox Code Playgroud)
或者
s3_client = boto3.client('s3', config=botocore.client.Config(max_pool_connections=50))
Run Code Online (Sandbox Code Playgroud)
因此警告本身来自用于发出 HTTP 请求的urllib3库。boto3
ConnectionPool类的配置选项max_pool_connections设置。maxsize
您可以尝试进一步增加此选项,看看这是否有帮助,但不要添加更多线程工作人员。
目前尚不清楚如何从代码片段中创建客户端,但看起来资源不是线程安全的,应该为每个线程/进程创建一个单独的资源:
笔记
资源不是线程安全的。这些特殊类包含无法在线程之间共享的附加元数据。使用资源时,建议为每个线程实例化一个新的资源,如上例所示。
低级客户端是线程安全的。使用低级客户端时,建议实例化客户端,然后将该客户端对象传递给每个线程。
| 归档时间: |
|
| 查看次数: |
10572 次 |
| 最近记录: |