我正在使用带有EasyNetQ库的C#中的RabbitMQ.我在这里使用了pub/sub模式.我仍然有一些问题希望有人可以帮助我:
任何人?
我正在尝试使用EasyNetQ连接到RabbitMQ.RabbitMQ在远程VM上.
_rabbitBus = RabbitHutch.CreateBus(
string.Format("host={0};virtualhost={1}",
_hostSettings.Host, _hostSettings.VHost),
x => x.Register<IEasyNetQLogger>(l => _logger));
_rabbitBus.Subscribe<Message>(_topic, ReceiveMessage, m => m.WithTopic(_topic));
Run Code Online (Sandbox Code Playgroud)
我得到一个TimeoutException The operation requested on PersistentChannel timed out..远程VM正在回复ping,端口5672和15672打开(使用nmap检查).可以从我的主机访问RabbitMQ管理.
另外,如果RabbitMQ在我的本地机器上运行,它可以正常工作.我尝试从局域网中的其他PC连接到我的计算机上安装的RabbitMQ,它也可以工作.
我假设它与虚拟机上的事实有关,也许连接有问题.但同样,Rabbit的网络管理工作正常.
也在EasyNetQ Test应用程序上测试 - 适用于localhost,但不适用于远程.
输出如下:
DEBUG: Trying to connect
ERROR: Failed to connect to Broker: '192.168.0.13', Port: 5672 VHost: '/'.
ExceptionMessage: 'None of the specified endpoints were reachable'
ERROR: Failed to connected to any Broker. Retrying in 5000 ms
Run Code Online (Sandbox Code Playgroud)
我没有让我的未处理异常进入EasyNetQ_Default_Error_Queue,我想知道是否有一种方法可以显式地说明应该用于给定应用程序的错误队列的名称,所以错误不会在这一个EasyNetQ_Default_Error_Queue中结束吗?
我可以看到如何指定常规消息队列名称,但没有找到任何有关错误队列名称的信息.
我是消息传递的新手,目前正在调查使用RabbitMQ作为我们系统架构的一部分,以提供不同服务之间的消息传递.我有一个基本的RabbitMQ示例工作,它可以通过总线传输基本文本消息.看起来EasyNetQ可能只是使用RabbitMQ的一些复杂性,尽管我在使用它时遇到了一些麻烦.
我想发送一个由以下类表示的更高级的消息,而不仅仅是一个字符串:
public class Message
{
public string Text { get; set; }
public int RandomNumber { get; set; }
public DateTime Date { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
我试图通过将其发布到队列来发送它,然后让订阅者从队列中取出它.我的代码如下:
出版者
using (var bus = RabbitHutch.CreateBus("host=localhost"))
{
var message = new Message() { Text = "Hello World", RandomNumber = new Random().Next(1,100), Date = DateTime.Now };
bus.Publish<Message>(message);
}
Run Code Online (Sandbox Code Playgroud)
接收器
using (var bus = RabbitHutch.CreateBus("host=localhost"))
{
bus.Subscribe<Message>("test", m => Console.WriteLine(string.Format("Text: {0}, RandomNumber: {1}, Date: {2}", m.Text, m.RandomNumber, …Run Code Online (Sandbox Code Playgroud) 我有一个使用EasyNetQ和RabbitMQ的Windows服务.该服务通常从服务控制管理器启动.但是我偶尔看到重启时,服务不会从服务事件日志中的错误开始:
A timeout was reached (30000 milliseconds)
The <serviceName> service failed to start due to the following error:
The service did not respond to the start or control request in a timely fashion.
Run Code Online (Sandbox Code Playgroud)
我试过自动延迟服务,这没有帮助.
另外,我正在考虑设置恢复机制,这样如果它没有启动,它会在第一次/第二次和随后的故障时重新启动.不确定这是否有效.
所以我的问题是如何确定导致我的服务无法启动的依赖性是什么?
我目前正在通过 EasyNetQ 使用 RabbitMQ 在 Windows 服务和众多客户端之间进行通信。通信是来自客户端的请求和向所有客户端推送通知的混合。我对当前解决方案的性能、可扩展性和安全性非常满意,但希望确保我不会错过最新技术中的某些内容。对于这种场景,Web API + SignalR 有什么优势(如果有)?
从我现在可以说的情况来看,当 Web 套接字可用时,SignalR 有可能提高性能,但从一开始就稍微复杂一些,如果我们需要横向扩展,就会变得更加复杂,因为需要背板。
有人可以分享任何其他见解吗?
在我的测试应用程序中,我可以看到处理过的异常消息被自动插入到默认的EasyNetQ_Default_Error_Queue中,这很棒.然后,我可以使用Hosepipe成功地转储或重新排队这些消息,这也很好,但需要下降到命令行并调用Hosepipe和RabbitMQ API来清除重试消息的队列.
所以我认为我的应用程序最简单的方法就是简单地订阅错误队列,这样我就可以使用相同的基础架构重新处理它们.但在EastNetQ中,错误队列似乎很特殊.我们需要使用正确的类型和路由ID进行订阅,因此我不确定错误队列的这些值应该是什么:
bus.Subscribe<WhatShouldThisBe>("and-this", ReprocessErrorMessage);
我可以使用简单的API订阅错误队列,还是需要深入了解高级API?
如果我的原始消息的类型是TestMessage,那么我希望能够做这样的事情:
bus.Subscribe<ErrorMessage<TestMessage>>("???", ReprocessErrorMessage);
哪个ErrorMessage是EasyNetQ提供的类来包装所有错误.这可能吗?
将EasyNetQ作为我们当前MQ通信库的替代品.
对于测试我试图简单地使用自定义命名策略将大量消息发布到交换.我的发布方法是下面的小测试方法>
public void PublishTest()
{
var advancedBus = RabbitHutch.CreateBus("host=localhost;virtualHost=Test;username=guest;password=guest;").Advanced;
var routingKey = "SimpleMessage";
// declare some objects
var queue = advancedBus.QueueDeclare("Q.TestQueue.SimpleMessage");
var exchange = advancedBus.ExchangeDeclare("E.TestExchange.SimpleMessage", ExchangeType.Direct);
var binding = advancedBus.Bind(exchange, queue, routingKey);
var message = new SimpleMessage() {Test = "HELLO"};
for (int i = 0; i < 100; i++)
{
advancedBus.Publish(exchange, routingKey, true, true, new Message<SimpleMessage>(message));
}
advancedBus.Dispose();
}
Run Code Online (Sandbox Code Playgroud)
问题是即使你创建了Exchange和队列,并且绑定得当,发布也不会产生任何结果.没有消息到达队列.Rabbit MQ管理界面中的图形甚至不显示交换上的任何活动.我错过了什么吗?代码基本上直接来自文档.
如果即时通讯使用简单的总线,只是发布,就会创建一个交换,我可以通过管理界面看到正在发布的消息.由于简单总线使用高级API发布我认为这是一个我缺少的设置问题.
我希望有人能带来一些见解:-)
/托马斯
我很好奇在 ASP.NET Core 2.x 应用程序中实现 EasyNetQ 发布/订阅模式的正确方法。具体来说,我需要确保所有这些资源的生命周期都是正确的,并且订阅线程拥有/正常运行。
我明白这IBus应该是一个单例。
标准做法是在应用程序的生命周期内创建单个 IBus 实例。当您的应用程序关闭时将其丢弃。
https://github.com/EasyNetQ/EasyNetQ/wiki/Connecting-to-RabbitMQ
所以,看起来像这样(尽管,我应该使用各种应用程序设置文件来提供特定于环境的连接字符串......让我们假设对于这个问题来说这是可以的)。
启动.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
}
Run Code Online (Sandbox Code Playgroud)
现在,我喜欢自动订阅器功能,但何时何地运行各种订阅方法并不明显。
您可以使用它轻松扫描特定程序集以查找实现 IConsume 或 IConsumeAsync 接口的类,然后让自动订阅者将这些使用者订阅到您的总线。
直接在启动上下文中运行它似乎不太正确,对吗?
AutoSubscriber,Singleton甚至是必要的?AutoSubscriber?我不希望任何其他代码需要它,我只需要它可以轻松/正确地访问Configure(似乎是运行类似方法的正确位置SubscribeAsync)。启动.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
services.AddSingleton<AutoSubscriber>(provider => new AutoSubscriber(provider.GetRequiredService<IBus>(), Assembly.GetExecutingAssembly().GetName().Name));
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.ApplicationServices.GetRequiredService<AutoSubscriber>().SubscribeAsync(Assembly.GetExecutingAssembly());
}
Run Code Online (Sandbox Code Playgroud)
我应该使用托管服务(我应该实现启动/停止方法还是BackgroundService可以)?
我正在使用 EasyNetQ,我想知道如何在不事先订阅的情况下获取现有IExchange的IQueue?
在IAdvanceBus我只能看到:
ExchangeDeclareAsyncQueueDeclareAsync但我不确定这些是否会覆盖现有队列或具有相同名称的交换?
另外,在尝试获取/声明队列或交换器之前,我如何确定该队列或交换器存在?