模拟 AWS 服务和 Lambda 最佳实践

mhe*_*eck 5 amazon-web-services typescript jestjs aws-lambda ts-jest

我正在开发一个简单的 AWS lambda 函数,该函数由 DynamoDB Streams 事件触发,并且应该将除REMOVE事件之外的所有记录转发到 SQS 队列。该功能按预期工作,没有什么意外。

我想编写一个单元测试来测试在事件发生时不向 SQS 提交任何内容的行为DELETE。我首先使用aws-sdk-mock尝试过此操作。正如您在函数代码中看到的,我尝试通过在处理程序代码之外初始化 SQS 客户端来遵守 lambda 最佳实践。显然,这会阻止aws-sdk-mock模拟 SQS 服务(GitHub 上有一个与此相关的问题: https: //github.com/dwyl/aws-sdk-mock/issues/206)。

然后,我尝试使用jest来模拟 SQS ,这需要更多代码才能正确完成,但我最终遇到了同样的问题,需要将 SQS 的初始化放在处理程序函数中,这违反了 lambda 最佳实践。

如何为此函数编写单元测试,同时让 SQS client() 的初始化const sqs: SQS = new SQS()在处理程序之外进行?我是否以错误的方式嘲笑服务,或者是否需要更改处理程序的结构以使其更易于测试?

我知道这个 lambda 函数非常简单,单元测试可能是不必要的,但我必须用更复杂的逻辑编写更多的 lambda,我认为这个非常适合演示这个问题。

索引.ts

import {DynamoDBStreamEvent, DynamoDBStreamHandler} from "aws-lambda";
import SQS = require("aws-sdk/clients/sqs");
import DynamoDB = require("aws-sdk/clients/dynamodb");

const sqs: SQS = new SQS()

export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
    const QUEUE_URL = process.env.TARGET_QUEUE_URL
    if (QUEUE_URL.length == 0) {
        throw new Error('TARGET_QUEUE_URL not set or empty')
    }
    await Promise.all(
        event.Records
            .filter(_ => _.eventName !== "REMOVE")
            .map((record) => {
                const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
                let request: SQS.SendMessageRequest = {
                    MessageAttributes: {
                        "EVENT_NAME": {
                            DataType: "String",
                            StringValue: record.eventName
                        }
                    },
                    MessageBody: JSON.stringify(unmarshalled),
                    QueueUrl: QUEUE_URL,
                }
                return sqs.sendMessage(request).promise()
            })
    );
}
Run Code Online (Sandbox Code Playgroud)

索引规格

import {DynamoDBRecord, DynamoDBStreamEvent, StreamRecord} from "aws-lambda";
import {AttributeValue} from "aws-lambda/trigger/dynamodb-stream";
import {handleDynamoDbEvent} from "./index";
import {AWSError} from "aws-sdk/lib/error";
import {PromiseResult, Request} from "aws-sdk/lib/request";
import * as SQS from "aws-sdk/clients/sqs";
import {mocked} from "ts-jest/utils";
import DynamoDB = require("aws-sdk/clients/dynamodb");


jest.mock('aws-sdk/clients/sqs', () => {
    return jest.fn().mockImplementation(() => {
        return {
            sendMessage: (params: SQS.Types.SendMessageRequest, callback?: (err: AWSError, data: SQS.Types.SendMessageResult) => void): Request<SQS.Types.SendMessageResult, AWSError> => {
                // @ts-ignore
                const Mock = jest.fn<Request<SQS.Types.SendMessageResult, AWSError>>(()=>{
                    return {
                        promise: (): Promise<PromiseResult<SQS.Types.SendMessageResult, AWSError>> => {
                            return new Promise<PromiseResult<SQS.SendMessageResult, AWSError>>(resolve => {
                                resolve(null)
                            })
                        }
                    }
                })
                return new Mock()
            }
        }
    })
});


describe.only('Handler test', () => {

    const mockedSqs = mocked(SQS, true)

    process.env.TARGET_QUEUE_URL = 'test'
    const OLD_ENV = process.env;

    beforeEach(() => {
        mockedSqs.mockClear()
        jest.resetModules();
        process.env = {...OLD_ENV};
    });

    it('should write INSERT events to SQS', async () => {
        console.log('Starting test')
        await handleDynamoDbEvent(createEvent(), null, null)
        expect(mockedSqs).toHaveBeenCalledTimes(1)
    });
})
Run Code Online (Sandbox Code Playgroud)

Phu*_*yen 1

只是我将如何处理这个问题的粗略想法:

  • 我不会在主函数内进行实际的 SQS 发送/操作,而是为消息客户端创建一个接口。像这样的东西:
interface QueueClient {
    send(eventName: string, body: string): Promise<any>;
}
Run Code Online (Sandbox Code Playgroud)
  • 并创建一个实现该接口的实际类以与 SQS 进行交互:
class SQSQueueClient implements QueueClient {
    queueUrl: string
    sqs: SQS

    constructor() {
        this.queueUrl = process.env.TARGET_QUEUE_URL;
        if (this.queueUrl.length == 0) {
            throw new Error('TARGET_QUEUE_URL not set or empty')
        }
        this.sqs = new SQS();
    }

    send(eventName: string, body: string): Promise<any> {
        let request: SQS.SendMessageRequest = {
            MessageAttributes: {
                "EVENT_NAME": {
                    DataType: "String",
                    StringValue: eventName
                }
            },
            MessageBody: body,
            QueueUrl: this.queueUrl,
        }
        return this.sqs.sendMessage()
    }
}
Run Code Online (Sandbox Code Playgroud)

本课程了解如何将数据转换为 SQS 格式的详细信息

  • 然后我将主函数分成 2 个。入口点只是解析队列 url,创建 sqs 队列客户端的实际实例并调用process(). 主要逻辑在process()
const queueClient = new SQSQueueClient();

export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
    return process(queueClient, event);
}

export const process = async (queueClient: QueueClient, event: DynamoDBStreamEvent) => {
    return await Promise.all(
        event.Records
            .filter(_ => _.eventName !== "REMOVE")
            .map((record) => {
                const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
                return queueClient.send(record.eventName, JSON.stringify(unmarshalled));
            })
    );
}
Run Code Online (Sandbox Code Playgroud)
  • 现在测试主要逻辑要容易得多process()。您可以提供一个通过手写实现接口的模拟实例QueueClient,或者使用您喜欢的任何模拟框架
  • 对于SQSQueueClient类,单元测试没有太大好处,所以我会更多地依赖集成测试(例如使用 localstack 之类的东西)

我现在没有实际的 IDE,所以如果这里或那里有语法错误,请原谅我