自动化RabbitMQ消费者测试

Yah*_*ein 12 integration-testing rabbitmq microservices

我有一个.net微服务使用RabbitMQ客户端接收消息,我需要测试以下内容:

1- consumer已成功连接到rabbitMq主机.

2-消费者正在听队列.

3-消费者正在成功接收消息.

为了实现上述目标,我创建了一个发送消息的示例应用程序,我正在调试消费者以确保它正在接收消息.

如何自动完成此测试?因此将它包含在我的微服务CI中.

我想将我的示例应用程序包含在我的CI中,这样我就可以发出一条消息,然后运行一个等待特定时间的消费者单元测试,然后在收到消息时通过,但这对我来说似乎是错误的做法,因为测试无法启动直到几秒钟才会触发消息.

我想到的另一种方法是从单元测试本身触发示例应用程序,但如果示例应用程序无法工作,则会导致服务故障.

是否有通过RabbitMQ连接的微服务集成测试的最佳实践?

Van*_*tly 8

我已经建立了许多这样的测试。我在这里使用.NET Core 2.0Github上抛出了一些基本代码

这些自动化测试将需要RabbitMQ集群。每次测试都从消除队列开始,以确保不存在任何消息。来自另一个测试的现有消息将中断当前测试。

我有一个简单的助手来删除队列。在我的应用程序中,它们总是声明自己的队列,但是如果不是您自己的情况,那么您将不得不再次创建队列以及对任何交换的任何绑定。

public class QueueDestroyer
{
    public static void DeleteQueue(string queueName, string virtualHost)
    {
        var connectionFactory = new ConnectionFactory();
        connectionFactory.HostName = "localhost";
        connectionFactory.UserName = "guest";
        connectionFactory.Password = "guest";
        connectionFactory.VirtualHost = virtualHost;
        var connection = connectionFactory.CreateConnection();
        var channel = connection.CreateModel();
        channel.QueueDelete(queueName);
        connection.Close();
    }
}
Run Code Online (Sandbox Code Playgroud)

我创建了一个非常简单的消费者示例,该示例代表您的微服务。它在任务中运行,直到取消。

public class Consumer
{
    private IMessageProcessor _messageProcessor;
    private Task _consumerTask;

    public Consumer(IMessageProcessor messageProcessor)
    {
        _messageProcessor = messageProcessor;
    }

    public void Consume(CancellationToken token, string queueName)
    {
        _consumerTask = Task.Run(() =>
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: queueName,
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        _messageProcessor.ProcessMessage(message);
                    };
                    channel.BasicConsume(queue: queueName,
                                        autoAck: false,
                                            consumer: consumer);

                    while (!token.IsCancellationRequested)
                        Thread.Sleep(1000);
                }
            }
        });
    }

    public void WaitForCompletion()
    {
        _consumerTask.Wait();
    }

}
Run Code Online (Sandbox Code Playgroud)

使用者具有IMessageProcessor接口,该接口将执行处理消息的工作。在集成测试中,我创建了一个假冒产品。您可能会为此使用首选的模拟框架。

测试发布者将消息发布到队列。

public class TestPublisher
{
    public void Publish(string queueName, string message)
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName="guest", Password="guest" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                    routingKey: queueName,
                                    basicProperties: null,
                                    body: body);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我的示例测试如下所示:

[Fact]
public void If_SendMessageToQueue_ThenConsumerReceiv4es()
{
    // ARRANGE
    QueueDestroyer.DeleteQueue("queueX", "/");
    var cts = new CancellationTokenSource();
    var fake = new FakeProcessor();
    var myMicroService = new Consumer(fake);

    // ACT
    myMicroService.Consume(cts.Token, "queueX");

    var producer = new TestPublisher();
    producer.Publish("queueX", "hello");

    Thread.Sleep(1000); // make sure the consumer will have received the message
    cts.Cancel();

    // ASSERT
    Assert.Equal(1, fake.Messages.Count);
    Assert.Equal("hello", fake.Messages[0]);
}
Run Code Online (Sandbox Code Playgroud)

我的假货是这样的:

public class FakeProcessor : IMessageProcessor
{
    public List<string> Messages { get; set; }

    public FakeProcessor()
    {
        Messages = new List<string>();
    }

    public void ProcessMessage(string message)
    {
        Messages.Add(message);
    }
}
Run Code Online (Sandbox Code Playgroud)

其他建议是:

  • 如果您可以将随机文本添加到队列中,并在每次测试运行时交换名称,则应这样做,以避免并发测试相互干扰

  • 如果您的应用程序不这样做,我的代码中还会有一些帮助程序来声明队列,交换和绑定。

  • 编写一个连接终止类,该连接终止类将强制关闭连接并检查您的应用程序是否仍然可以正常工作并可以恢复。我有相应的代码,但.NET Core中没有。只需问我,我就可以对其进行修改以在.NET Core中运行。

  • 总的来说,我认为您应该避免在集成测试中包括其他微服务。例如,如果您将消息从一项服务发送到另一项服务,并且希望收到一条消息,则创建一个可以模拟预期行为的虚假使用者。如果您收到来自其他服务的消息,请在集成测试项目中创建伪造的发布者。


Bar*_*cki 5

我成功地做了这样的测试.您需要RabbitMQ的测试实例,测试交换以发送消息并测试队列以连接以接收消息.

不要嘲笑一切!

但是,对于RabbitMQ的测试使用者,生产者和测试实例,该测试中没有实际的生产代码.

使用测试rabbitMQ实例和真正的应用程序

为了进行meaniningfull测试,我会使用测试RabbitMQ实例,交换和队列,但留下真正的应用程序(生产者和消费者).

我将实现以下场景

  1. 当测试应用程序做一些测试消息给rabbitMQ

  2. 然后增加rabbitMQ中接收消息的数量

  3. 应用程序执行接收消息时应该执行的操作

步骤1和3是特定于应用程序的.您的应用程序根据某些外部事件(收到HTTP消息?计时器事件?)向rabbitMQ发送消息.您可以在测试中重现此类条件,因此应用程序将发送消息(以测试rabbitMQ实例).

用于在接收消息时验证应用程序操作的相同故事.应用程序应该在收到消息时做一些可观察的事情.如果应用程序进行HTTP调用,那么您可以模拟该HTTP端点并验证收到的消息.如果应用程序将消息保存到数据库,您可以将数据库池化以查找消息.

使用rabbitMQ监控API

可以使用RabbitMQ监控API实现第2步(有一些方法可以查看从队列https://www.rabbitmq.com/monitoring.html#rabbitmq-metrics收到和使用的消息数)

考虑使用弹簧靴进行健康检查

如果你是基于java的,那么使用Spring Boot将大大简化你的问题.您将自动获得RabbitMQ连接的健康检查!

有关如何使用Spring启动连接到RabbitMQ的教程,请参阅https://spring.io/guides/gs/messaging-rabbitmq/.Spring启动应用程序为每个连接的外部资源(数据库,消息传递,jms等)公开健康信息(使用HTTP端点/运行状况).请参阅https://docs.spring.io/spring-boot/docs/current/reference/html/ production-ready-endpoints.html#_auto_configured_healthindicators了解详情.

如果与rabbitMQ的连接断开,那么运行状况检查(由org.springframework.boot.actuate.amqp.RabbitHealthIndicator完成)将在JSON正文中返回HTTP代码4xx和meaninfull json消息.

您不需要做任何特别的事情来进行健康检查 - 只需使用org.springframework.boot:spring-boot-starter-amqp作为maven/gradle依赖就足够了.

CI测试 - 来自src/test目录

我在src/test目录中使用集成测试编写了这样的测试(连接到RabbitMQ的外部测试实例).如果使用Spring Boot,最简单的方法是使用测试配置文件,并在application-test.properties中测试RabbitMQ实例的连接细节(生产可以使用生产配置文件,以及带有RabbitMQ生产实例的application-production.properties文件).

在最简单的情况下(只需验证与rabbitMQ的连接),您只需要正常启动应用程序并验证/健康端点.

在这种情况下,我会做CI步骤

  • 构建一个(gradle build)
  • 一个运行单元测试(没有任何外部依赖的测试)
  • 一个运行集成测试

CI测试 - 外部

上述方法也可以用于部署到测试环境的应用程序(并连接到测试rabbitMQ实例).应用程序启动后,您可以检查/ health endpoint以确保它已连接到rabbitMQ实例.

如果您让应用程序向rabbitMQ发送消息,那么您可以观察rabbbitMQ指标(使用rabbitMQ监控API)并观察应用程序消耗的消息的外部影响.

对于此类测试,您需要从CI启动并部署应用程序,以便开始测试.

对于那种情况,我会遵循CI步骤

  • 构建应用程序的步骤
  • 在src/test目录中运行所有测试的步骤(单元,集成)
  • 将app部署到测试环境或启动dockerized应用程序的步骤
  • 运行外部测试的步骤
  • 对于dockerized环境,停止docker容器的步骤

考虑码头化的环境

对于外部测试,您可以在Docker中运行应用程序以及测试RabbitMQ实例.您将需要两个docker容器.

要运行这两个图像,最合适的是编写docker-compose文件.