我正在使用Carrot作为Django项目中的消息队列,并按照教程,它工作正常.但是这个例子在控制台中运行,我想知道如何在Django中应用它.我在models.py中从我的一个模型调用的发布者类,所以没关系.但我不知道在哪里放消费类.
因为它只是与.wait()坐在那里,我不知道我需要在什么时间或地点实例化它,以便它始终运行并监听消息!
谢谢!
在rails web app中,如果我将消息写入像rabbitmq这样的队列,那么当生产者向队列发送消息时,客户端将如何得到通知?
我猜我必须创建一个在后台运行的单独进程来响应消息是否正确?即此代码超出了Web应用程序的范围.
如果是这种情况,是否可以重新使用rails应用程序中的模型/库?我必须在2个地方复制此代码吗?
我正在研究需要通过代码控制发送队列的项目.所以我只是好奇有人用python/django代码在rabbitmq中创建队列?:)
我正在尝试使用scalatest和mockito来模拟RabbitMQ ConnectionFactory对象以返回模拟连接.以下是我正在使用的示例测试:
class RabbitMQMockTest extends FunSuite with MockitoSugar {
test("RabbitMQ ConnectionFactory is correctly mocked") {
def connectionFactory = mock[ConnectionFactory]
def connection = mock[Connection]
when(connectionFactory.newConnection()).thenReturn(connection)
println(connectionFactory.newConnection())
assert(connectionFactory.newConnection() != null)
}
}
Run Code Online (Sandbox Code Playgroud)
这总是失败,println语句总是打印"null".我一起使用这些技术非常新,并且想知道是否有人有任何建议或者如果我做错了什么我可以告诉我.提前致谢!
我通过 Masstransit 向rabbitmq交换“x”发送一些消息,如下所示:
var endpoint = await _bus.GetSendEndpoint(new Uri("exchange:x"));
var message = new CustomType {
accountId = 1
};
await endpoint.Send(message);
Run Code Online (Sandbox Code Playgroud)
我在队列中收到的事件如下所示:
{
"messageId": "x",
"conversationId": "x",
"sourceAddress": "rabbitmq://localhost/x",
"destinationAddress": "rabbitmq://localhost/x",
"messageType": [
"urn:message:x"
],
"message": {
"accountId": 1
},
"sentTime": "x",
"headers": {
"MT-Activity-Id": "x"
},
"host": {
...
}
}
Run Code Online (Sandbox Code Playgroud)
但我希望队列中的消息如下:
{
"accountId": 1
}
Run Code Online (Sandbox Code Playgroud)
Masstransit 有什么方法可以将原始消息发送到队列吗?
我想将 Mass Transit 与 RabbitMQ 和 Kafka 结合使用作为消息代理。
在部署阶段可以决定使用哪一个来传输消息。
我知道什么是 ConsumeContext、ISendEndpointProvider 或 IPublishEndpoint。根据 链接,该接口在 RabbitMQ 和 Kafka 之间共享,用于发布消息并使用该消息。
我在 .Net Core 应用程序启动时配置了 RabbitMQ 和 Kafka,并通过部署阶段的 appSetting 可以决定必须注册哪一个。
这种方法对于在我的应用程序中同时配置 Kafka 和 RabbitMq 是否正确?
我正在构建一个应用程序,其中的微服务通过 RabbitMQ(请求-响应模式)进行通信。
一切正常,但我仍然遇到错误“远程服务中没有定义匹配的消息处理程序”的问题。-
当我将 POST 发送到客户端应用程序时,它应该只是通过客户端(ClientProxy)发送带有数据的消息,并且消费者应用程序应该响应。该功能确实有效,但总是只能第二次。我知道这听起来很奇怪,但在我的第一个 POST 请求中,总是有来自客户端的错误,而我的每一个 POST 请求都有效。然而这个问题在我的整个应用程序中随处可见,因此特定的 POST 请求仅用于示例。
这是代码:
客户:
@Post('devices')
async pushDevices(
@Body(new ParseArrayPipe({ items: DeviceDto }))
devices: DeviceDto[]
) {
this.logger.log('Devices received');
return this.client.send(NEW_DEVICES_RECEIVED, devices)
}
Run Code Online (Sandbox Code Playgroud)
消费者:
@MessagePattern(NEW_DEVICES_RECEIVED)
async pushDevices(@Payload() devices: any, @Ctx() context: RmqContext) {
console.log('RECEIVED DEVICES');
console.log(devices);
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
return 'ANSWER';
}
Run Code Online (Sandbox Code Playgroud)
客户端的 RMQ 设置为queueOptions: {durable: true},消费者也有queueOptions: {durable: true}和noAck: false
请您知道可能会导致问题的原因吗?我尝试使用 JSON.stringify …
我正在尝试使用 RabbitMQ 在春季创建发布者和消费者应用程序。一切工作正常,当发布者发送消息时,消费者接收并成功消费它。但正如您在下图中看到的,RabbitMQ 界面没有向我显示创建的队列和交换。
无需排队
无交换
这是我写的代码:
RABBITMQ 配置(发布者应用程序和消费者应用程序中相同)
package com.example.rabbitmq.springrabbitmqconsumer.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String ROUTING_A = "routing.A";
public static final String ROUTING_B = "routing.B";
//QUEUES
@Bean
Queue queueA() {
return new Queue("queue.A", false);
}
@Bean
Queue queueB() {
return new Queue("queue.B", false);
}
//Direct Exchange
@Bean
DirectExchange exchange() {
return new DirectExchange("exchange.direct");
}
//BINDINGS …Run Code Online (Sandbox Code Playgroud) 我遇到一种情况,我应该通过 RabbitMQ 同步发布消息(旧代码),否则消息将乱序,因为 MassTransit 在不同的线程中发布
public void PostUserQuantitySync(int userId, decimal amount)
{
foreach (var item in Enumerable.Range(0, 1000))
{
var _ = _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item });
}
return Ok();
}
Run Code Online (Sandbox Code Playgroud)
因此,我使用了 TaskUtil.Await 和/或 Wait(),但发布性能非常差(每秒 33/s 消息),而纯兔子客户端的结果要好得多(至少每秒 200/s 消息)并且尊重消息订购:
public void PostUserQuantitySync(int userId, decimal amount)
{
foreach (var item in Enumerable.Range(0, 1000))
{
TaskUtil.Await(() _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item }, c => c.SetAwaitAck(false)));
}
}
Run Code Online (Sandbox Code Playgroud)
MassTransit 在同步上下文中是否存在任何性能问题,或者我应该在代码中使用任何调整吗?
我正在尝试通过 docker-compose 命令设置rabbitmq 实例。
我的 docker 撰写 yaml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
hostname: rabbit
container_name: 'rabbitmq'
volumes:
- ./etc/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./data:/var/lib/rabbitmq/mnesia/rabbit@rabbit
- ./logs:/var/log/rabbitmq/log
- ./etc/ssl/CERT_LAB_CA.pem:/etc/rabbitmq/ssl/cacert.pem
- ./etc/ssl/CERT_LAB_RABBITMQ.pem:/etc/rabbitmq/ssl/cert.pem
- ./etc/ssl/KEY_LAB_RABBITMQ.pem:/etc/rabbitmq/ssl/key.pem
ports:
- 5672:5672
- 15672:15672
- 15671:15671
- 5671:5671
environment:
- RABBITMQ_DEFAULT_USER=secret
- RABBITMQ_DEFAULT_PASS=secret
Run Code Online (Sandbox Code Playgroud)
当我第一次运行 docker compose up 时,一切正常。但是当我添加队列并交换(从定义.json 加载)、关闭并删除容器并尝试再次 docker compose up 时,出现此错误
2022-09-29 13:32:09.522956+00:00 [notice] <0.44.0> Application mnesia exited with reason: stopped
2022-09-29 13:32:09.523096+00:00 [error] <0.229.0>
2022-09-29 13:32:09.523096+00:00 [error] <0.229.0> BOOT FAILED
2022-09-29 13:32:09.523096+00:00 [error] <0.229.0> =========== …Run Code Online (Sandbox Code Playgroud)