多个并行 AWS Lambda 调用

mak*_*sim 2 amazon-web-services python-asyncio aws-lambda python-3.7

我正在尝试使用 python 3.7.2 和 aiobotocore 包执行多个 AWS Lambda 调用。这是我的代码。

import asyncio
import aiobotocore


async def invoke(payload, session):
    async with session.create_client('lambda', region_name='us-east-1') as client:
        return await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)


def generate_invocations(payloads, session):
    for payload in payloads:
        yield invoke(payload, session)


def invoke_all(payloads):
    loop = asyncio.get_event_loop()

    async def wrapped():
        session = aiobotocore.get_session(loop=loop)
        invocations = generate_invocations(payloads, session)
        return await asyncio.gather(*invocations)

    return loop.run_until_complete(wrapped())


def main():
    payloads_list = []  # MY PAYLOADS LIST 
    lambda_responses = invoke_all(payloads_list)
    print(lambda_responses)


if __name__ == '__main__':
    main()

Run Code Online (Sandbox Code Playgroud)

代码运行速度非常快(10 个有效负载大约 1 秒,而不是使用 boto3 lambda 客户端调用 15 个有效负载),但我有两个问题:

1) lambda_responses 中的元素包括“Payload”键,其值的类型为 aiobotocore.response.StreamingBody。当我尝试 value.read() 时,我收到“coroutine object StreamingBody.read”,我认为我的代码中存在一些问题。我可以通过“json.loads(json.loads(r['Payload']._buffer.pop())['body'])”收到所需的响应,但是获取它的正确方法是什么。

2) 在极少数情况下,响应之一中的“有效负载”缓冲区为空。如何确保 invoke_all 函数返回非空响应?aiobotocore 的用法正确吗?

我是 python 3 和异步功能的新手。受到aiobotocore文档和 Mathew Marcus博客示例的启发。

谢谢!

use*_*342 6

  1. lambda_responses 中的元素包括“Payload”键,其值的类型为 aiobotocore.response.StreamingBody。当我尝试 value.read() 时,我收到“coroutine object StreamingBody.read”

这意味着read()协程应该等待,您应该在事件循环中执行此操作。例如,您可以更改invoke协程以读取响应:

async def invoke(payload, session):
    async with session.create_client('lambda', region_name='us-east-1') as client:
        resp = await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
        payload = await resp['Payload'].read()
        return payload  # or assemble a dict with relevant parts
Run Code Online (Sandbox Code Playgroud)
  1. 在极少数情况下,响应之一中的“有效负载”缓冲区为空。

这可能是因为您在实际读取内容之前访问缓冲区。在某些情况下,信息很快就会到达,您无论如何都能在内部缓冲区中找到它,但有时您必须等待它。使用公共方法(例如)read()可确保您正确使用 API。_buffer另一方面,该属性以下划线开头,这表示它是一个实现细节,不能直接访问。