标签: rabbitmq

在Django中放置消息队列使用者的位置?

我正在使用Carrot作为Django项目中的消息队列,并按照教程,它工作正常.但是这个例子在控制台中运行,我想知道如何在Django中应用它.我在models.py中从我的一个模型调用的发布者类,所以没关系.但我不知道在哪里放消费类.

因为它只是与.wait()坐在那里,我不知道我需要在什么时间或地点实例化它,以便它始终运行并监听消息!

谢谢!

python django message-queue amqp rabbitmq

1
推荐指数
1
解决办法
2041
查看次数

使用带有rails的rabbitmq,如何创建无限循环过程?

在rails web app中,如果我将消息写入像rabbitmq这样的队列,那么当生产者向队列发送消息时,客户端将如何得到通知?

我猜我必须创建一个在后台运行的单独进程来响应消息是否正确?即此代码超出了Web应用程序的范围.

如果是这种情况,是否可以重新使用rails应用程序中的模型/库?我必须在2个地方复制此代码吗?

ruby-on-rails rabbitmq

1
推荐指数
1
解决办法
1008
查看次数

我们可以使用python在rabbitmq中创建队列吗?

我正在研究需要通过代码控制发送队列的项目.所以我只是好奇有人用python/django代码在rabbitmq中创建队列?:)

python django rabbitmq

1
推荐指数
1
解决办法
1005
查看次数

当尝试模拟rabbitmq ConnectionFactory时,Mockito在/然后返回时始终在Scala Test中返回null

我正在尝试使用scalatest和mockito来模拟RabbitMQ ConnectionFactory对象以返回模拟连接.以下是我正在使用的示例测试:

class RabbitMQMockTest extends FunSuite with MockitoSugar {
    test("RabbitMQ ConnectionFactory is correctly mocked") {
        def connectionFactory = mock[ConnectionFactory]
        def connection = mock[Connection]

        when(connectionFactory.newConnection()).thenReturn(connection)
        println(connectionFactory.newConnection())

        assert(connectionFactory.newConnection() != null)
    }
}
Run Code Online (Sandbox Code Playgroud)

这总是失败,println语句总是打印"null".我一起使用这些技术非常新,并且想知道是否有人有任何建议或者如果我做错了什么我可以告诉我.提前致谢!

scala mockito rabbitmq scalatest

1
推荐指数
1
解决办法
3035
查看次数

有没有办法从 MassTransit 发送原始消息?

我通过 Masstransit 向rabbitmq交换“x”发送一些消息,如下所示:

var endpoint = await _bus.GetSendEndpoint(new Uri("exchange:x"));
var message = new CustomType {
  accountId = 1
};
await endpoint.Send(message);
Run Code Online (Sandbox Code Playgroud)

我在队列中收到的事件如下所示:

{
  "messageId": "x",
  "conversationId": "x",
  "sourceAddress": "rabbitmq://localhost/x",
  "destinationAddress": "rabbitmq://localhost/x",
  "messageType": [
    "urn:message:x"
  ],
  "message": {
    "accountId": 1
  },
  "sentTime": "x",
  "headers": {
    "MT-Activity-Id": "x"
  },
  "host": {
    ...
  }
}
Run Code Online (Sandbox Code Playgroud)

但我希望队列中的消息如下:

{
    "accountId": 1   
}
Run Code Online (Sandbox Code Playgroud)

Masstransit 有什么方法可以将原始消息发送到队列吗?

masstransit rabbitmq

1
推荐指数
1
解决办法
1655
查看次数

Kafka 和 RabbitMQ 的公共交通配置

我想将 Mass Transit 与 RabbitMQ 和 Kafka 结合使用作为消息代理。
在部署阶段可以决定使用哪一个来传输消息。

我知道什么是 ConsumeContext、ISendEndpointProvider 或 IPublishEndpoint。根据 链接,该接口在 RabbitMQ 和 Kafka 之间共享,用于发布消息并使用该消息。
我在 .Net Core 应用程序启动时配置了 RabbitMQ 和 Kafka,并通过部署阶段的 appSetting 可以决定必须注册哪一个。

这种方法对于在我的应用程序中同时配置 Kafka 和 RabbitMq 是否正确?

masstransit rabbitmq

1
推荐指数
1
解决办法
1969
查看次数

NestJS 微服务错误“没有匹配的消息处理程序”

我正在构建一个应用程序,其中的微服务通过 RabbitMQ(请求-响应模式)进行通信。
一切正常,但我仍然遇到错误“远程服务中没有定义匹配的消息处理程序”的问题。-
当我将 POST 发送到客户端应用程序时,它应该只是通过客户端(ClientProxy)发送带有数据的消息,并且消费者应用程序应该响应。该功能确实有效,但总是只能第二次。我知道这听起来很奇怪,但在我的第一个 POST 请求中,总是有来自客户端的错误,而我的每一个 POST 请求都有效。然而这个问题在我的整个应用程序中随处可见,因此特定的 POST 请求仅用于示例。

这是代码:

客户:

@Post('devices')
async pushDevices(
    @Body(new ParseArrayPipe({ items: DeviceDto }))
    devices: DeviceDto[]
    ) {
    this.logger.log('Devices received'); 
    return this.client.send(NEW_DEVICES_RECEIVED, devices)
}
Run Code Online (Sandbox Code Playgroud)

消费者:

 @MessagePattern(NEW_DEVICES_RECEIVED)
 async pushDevices(@Payload() devices: any, @Ctx() context: RmqContext) {
    console.log('RECEIVED DEVICES');
    console.log(devices);
    const channel = context.getChannelRef();
    const originalMsg = context.getMessage();
    channel.ack(originalMsg);
    return 'ANSWER';
  }
Run Code Online (Sandbox Code Playgroud)

客户端的 RMQ 设置为queueOptions: {durable: true},消费者也有queueOptions: {durable: true}noAck: false

请您知道可能会导致问题的原因吗?我尝试使用 JSON.stringify …

rabbitmq docker microservices nestjs

1
推荐指数
1
解决办法
4562
查看次数

RabbitMQ 没有向我显示在 Spring 消费者和发布者应用程序中创建的队列和交换

我正在尝试使用 RabbitMQ 在春季创建发布者和消费者应用程序。一切工作正常,当发布者发送消息时,消费者接收并成功消费它。但正如您在下图中看到的,RabbitMQ 界面没有向我显示创建的队列和交换。

无需排队

在此输入图像描述

无交换

在此输入图像描述

这是我写的代码:

RABBITMQ 配置(发布者应用程序和消费者应用程序中相同)

package com.example.rabbitmq.springrabbitmqconsumer.configuration;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String ROUTING_A = "routing.A";
    public static final String ROUTING_B = "routing.B";

    //QUEUES
    @Bean
    Queue queueA() {
        return new Queue("queue.A", false);
    }
    @Bean
    Queue queueB() {
        return new Queue("queue.B", false);
    }

    //Direct Exchange
    @Bean
    DirectExchange exchange() {
        return new DirectExchange("exchange.direct");
    }

    //BINDINGS …
Run Code Online (Sandbox Code Playgroud)

java spring rabbitmq docker

1
推荐指数
1
解决办法
948
查看次数

MassTransit 在同步上下文中发布消息的速度非常慢

我遇到一种情况,我应该通过 RabbitMQ 同步发布消息(旧代码),否则消息将乱序,因为 MassTransit 在不同的线程中发布

    public void PostUserQuantitySync(int userId, decimal amount)
{
    foreach (var item in Enumerable.Range(0, 1000))
    {
        var _ = _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item });
    }
    return Ok();
}
Run Code Online (Sandbox Code Playgroud)

因此,我使用了 TaskUtil.Await 和/或 Wait(),但发布性能非常差(每秒 33/s 消息),而纯兔子客户端的结果要好得多(至少每秒 200/s 消息)并且尊重消息订购:

    public void PostUserQuantitySync(int userId, decimal amount)
    {
        foreach (var item in Enumerable.Range(0, 1000))
        {
            TaskUtil.Await(() _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item }, c => c.SetAwaitAck(false)));
        }
    }
Run Code Online (Sandbox Code Playgroud)

MassTransit 在同步上下文中是否存在任何性能问题,或者我应该在代码中使用任何调整吗?

.net c# masstransit rabbitmq async-await

1
推荐指数
1
解决办法
2677
查看次数

RabbitMQ 无法从 docker-compose 启动

我正在尝试通过 docker-compose 命令设置rabbitmq 实例。

我的 docker 撰写 yaml

version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    hostname: rabbit
    container_name: 'rabbitmq'
    volumes:
      - ./etc/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./data:/var/lib/rabbitmq/mnesia/rabbit@rabbit
      - ./logs:/var/log/rabbitmq/log
      - ./etc/ssl/CERT_LAB_CA.pem:/etc/rabbitmq/ssl/cacert.pem
      - ./etc/ssl/CERT_LAB_RABBITMQ.pem:/etc/rabbitmq/ssl/cert.pem
      - ./etc/ssl/KEY_LAB_RABBITMQ.pem:/etc/rabbitmq/ssl/key.pem
    ports:
      - 5672:5672
      - 15672:15672
      - 15671:15671
      - 5671:5671
    environment:
      - RABBITMQ_DEFAULT_USER=secret
      - RABBITMQ_DEFAULT_PASS=secret
Run Code Online (Sandbox Code Playgroud)

当我第一次运行 docker compose up 时,一切正常。但是当我添加队列并交换(从定义.json 加载)、关闭并删除容器并尝试再次 docker compose up 时,出现此错误

2022-09-29 13:32:09.522956+00:00 [notice] <0.44.0> Application mnesia exited with reason: stopped
2022-09-29 13:32:09.523096+00:00 [error] <0.229.0>
2022-09-29 13:32:09.523096+00:00 [error] <0.229.0> BOOT FAILED
2022-09-29 13:32:09.523096+00:00 [error] <0.229.0> =========== …
Run Code Online (Sandbox Code Playgroud)

ubuntu rabbitmq docker docker-compose

1
推荐指数
1
解决办法
3924
查看次数