Async、Lambda、Boto3、ThreadPoolExecutor:连接池已满,正在丢弃连接

Fal*_*aea 8 python asynchronous threadpoolexecutor boto3 aws-lambda

一些背景:

我正在开发一个测试套件,用于使用 CodePipeline 和两个 lambda 函数更新和测试全局重定向:一个包装器函数和一个测试器函数。

包装器循环遍历域列表。对于每个域,它获取关联的 S3 文件,然后创建一长串对象(文件中每行一个对象)。示例对象:

redirect = {
     'staging': None,
     'prod': None,
     'country': 'US',
     'language': 'EN',
     'status_code': None,
     'shortlink': None,
     'expected': None
}
Run Code Online (Sandbox Code Playgroud)

然后,包装器获取此对象列表并调用第二个 lambda(测试器 lambda),该 lambda 为每个对象发送一系列 httpx 请求等。

其中最重要的一点是它是异步的 - 否则,lambda 将在 15 分钟时超时。

实际问题:

包装函数只能触发测试器 10 次。否则,当返回结果时,我会收到以下错误:

Connection pool is full, discarding connection: lambda.us-east-1.amazonaws.com
Run Code Online (Sandbox Code Playgroud)

我提高了 ThreadPoolExecutor 的 max_workers,甚至为 boto3 设置了 max_workers。我与 AWS 支持人员进行了交谈,我的实际 lambda 设置有利于我的测试器 lambda 根据需要多次触发。然而,我仍然收到连接池错误。

相关代码:

from botocore.client import Config
from boto3.session import Session
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3

max_pool_connections = 30

config = Config(
    max_pool_connections=max_pool_connections,
    read_timeout = 900
)

def handler(event, context):
    try:
        events = ... # generated via a bunch of nonsense 

        with ThreadPoolExecutor(max_pool_connections) as executor:
            futures = []
            for event in events:
                print(event)
                future = executor.submit(lambda_client.invoke, FunctionName = "site-tester", InvocationType = "RequestResponse", Payload = json.dumps(event))
                futures.append(future)

            for index, future in enumerate(as_completed(futures, timeout=None), start=1):
                ...
Run Code Online (Sandbox Code Playgroud)

这(显然)不是完整的代码,因为完整的代码绝对是一堆废话。我目前没有时间创建完整的测试功能,但如果有人有任何初步想法或故障排除提示,我们将不胜感激。

我确实想指出,对于确实被调用并返回的 10 个事件,一切都按预期进行。仅限 10 次,但我确实需要更多。

同样,其中最重要的部分是它是异步的 - 否则,lambda 将在 15 分钟标记处超时。

并发设置

包装函数:保留并发设置为 30

测试器功能:保留并发设置为 30

Svi*_*huk 11

正如 JustinTArthur 在Github上提到的:

这个警告没问题。如果您深入研究 urllib3 连接池代码,它基本上是持久连接池,而不是您可以拥有的最大并发连接数。如果端点尚未关闭池中的连接,则会重新使用它们。

如果您想增加此池的大小,可以使用低级 botocore 配置在每个端点上完成。boto3 示例:

import boto3
import botocore

client_config = botocore.config.Config(
    max_pool_connections=25,
)
boto3.client('lambda', config=client_config)
Run Code Online (Sandbox Code Playgroud)

或者

s3_client = boto3.client('s3', config=botocore.client.Config(max_pool_connections=50))
Run Code Online (Sandbox Code Playgroud)


HTF*_*HTF 4

因此警告本身来自用于发出 HTTP 请求的urllib3库。boto3

ConnectionPool类的配置选项max_pool_connections设置。maxsize

您可以尝试进一步增加此选项,看看这是否有帮助,但不要添加更多线程工作人员。

目前尚不清楚如何从代码片段中创建客户端,但看起来资源不是线程安全的,应该为每个线程/进程创建一个单独的资源:

多线程和多处理:

笔记

资源不是线程安全的。这些特殊类包含无法在线程之间共享的附加元数据。使用资源时,建议为每个线程实例化一个新的资源,如上例所示。

低级客户端线程安全的。使用低级客户端时,建议实例化客户端,然后将该客户端对象传递给每个线程。