RabbitMQ安全设计从服务器声明队列(并从客户端使用)

g18*_*18c 5 rabbitmq

我有一个测试应用程序(首先使用RabbitMQ),它运行在部分受信任的客户端上(因为我不希望它们自己创建队列),所以我将查看客户端连接的队列和凭据的安全权限.

对于消息传递,主要是从服务器到客户端的单向广播,有时是从服务器到特定客户端的查询(回复将通过replyTo队列发送,该队列专用于服务器侦听响应的客户端) .

我目前在服务器上有一个接收功能,它寻找客户的"宣布"广播:

agentAnnounceListener.Received += (model, ea) =>
{
    var body = ea.Body;
    var props = ea.BasicProperties;
    var message = Encoding.UTF8.GetString(body);

    Console.WriteLine(
        "[{0}] from: {1}. body: {2}",
        DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime).Date,
        props.ReplyTo,
        message);

      // create return replyTo queue, snipped in next code section
};
Run Code Online (Sandbox Code Playgroud)

我想在上面的接收处理程序中创建返回主题:

var result = channel.QueueDeclare(
    queue: ea.BasicProperties.ReplyTo,
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null);
Run Code Online (Sandbox Code Playgroud)

或者,我可以将收到的通知存储在数据库中,并通过此列表运行常规计时器,并在每次通过时为每个通道声明一个队列.

在这两种情况下,服务器将在未来的某个时间点使用这个新创建的通道,以向客户端发送查询.

我的问题是:

1)在从客户端接收消息时在服务器上创建回复通道是否更好,或者如果我在外部(在计时器上)执行回复通道,是否存在用于声明已存在的队列的性能问题(可能有数千个端点) )?

2)如果客户端开始错过行为,是否有任何方式可以启动它们(在接收功能中,我可以查找每分钟有多少消息并在满足某些条件时启动)?是否有任何其他过滤器可以在管道接收之前定义,以启动发送过多邮件的客户端?

3)在上面的例子中注意我的消息在每次运行中都会不断出现(相同的旧消息),我该如何清除它们?

Zac*_*c B 1

以下是适合您的场景的一些通用架构/可靠性想法。对您的 3 个具体问题的答复在最后。

一般架构思想

我不确定服务器上声明响应队列方法是否会产生性能/稳定性优势;你必须对此进行基准测试。我认为实现您想要的最简单的拓扑如下:

  1. 每个客户端在连接时都会声明一个exclusive和/或autodelete匿名队列。如果客户端的网络连接非常粗略,以至于不希望保持打开的直接连接,那么类似于上面 Alex 提出的“Web 应用程序”,让客户端点击代表他们声明独占/自动删除队列的端点,并关闭当客户端没有足够定期联系时,连接(在消费者离开时自动删除队列)。仅当您无法调整来自客户端的 RabbitMQ 心跳以使其在网络不可靠的情况下工作,或者您可以证明您需要在 Web 应用程序层内限制队列创建速率时,才应执行此操作。
  2. 每个客户端的队列都绑定到一个广播主题交换,服务器使用它来传递广播消息(通配符路由键)或特定目标消息(仅与一个客户端的队列名称匹配的路由键)。
  3. 当服务器需要从客户端获取回复时,您可以让服务器在发送“需要响应”消息之前声明响应队列,并在消息中对响应队列进行编码(基本上是您现在正在做的事情) ,或者您可以在客户端中构建语义,在其中它们在exclusive再次尝试(互斥)消费之前停止从其广播队列消费一段固定的时间,将其响应发布到自己的队列,并确保服务器在以下时间内消费这些响应:在关闭服务器消耗并恢复正常广播语义之前分配的时间。不过,第二种方法要复杂得多,而且可能不值得。

防止客户端压垮 RabbitMQ

可以减少服务器负载并帮助防止客户端使用 RMQ 操作 DoS 您的服务器的措施包括:

  • 在所有队列上设置适当的、较低的最大长度阈值,以便服务器存储的消息量永远不会超过客户端数量的特定倍数。
  • 设置每个队列的过期时间或每个消息的过期时间,以确保过时的消息不会累积。
  • 对特定的 RabbitMQ 操作进行速率限制非常棘手,但您可以在 TCP 级别进行速率限制(使用例如 HAProxy 或其他路由器/代理堆栈),以确保您的客户端不会发送太多数据或打开太多连接,一次。根据我的经验(仅一个数据点;如果有疑问,请进行基准测试!)RabbitMQ 不太关心每次摄取的消息数量,而是关心数据量和摄取的每条消息的最大可能大小。很多小消息通常都可以;一些巨大的可能会导致延迟峰值,否则,在 TCP 层对字节进行速率限制可能会让您在必须重新评估之前将这样的系统扩展得非常远。

具体答案

鉴于上述情况,我对你的具体问题的回答是:

问:是否应该在服务器上创建回复队列以响应收到的消息?

答:是的,可能。如果您担心由此产生的队列创建率,您可以对每个服务器实例进行速率限制。看起来您正在使用 Node,因此您应该能够使用该平台的现有解决方案之一,为每个节点服务器实例提供一个队列创建速率限制器,除非您有数千台服务器(不是客户端) ),应该允许您在重新评估之前达到非常非常大的规模。

问:根据客户端操作声明队列是否会对性能产生影响?或者重新声明队列?

- 答:基准看看!重新声明可能没问题;如果您正确限制速率,您可能根本不需要担心这个问题。根据我的经验,大量的队列声明事件可能会导致延迟稍微增加,但不会破坏服务器。但这只是我的经验!每个人的场景/部署都不同,因此基准测试是无可替代的。在这种情况下,您将用稳定的消息流启动发布者/消费者,跟踪例如发布/确认延迟或消息接收延迟、rabbitmq 服务器负载/资源使用情况等。运行时,声明大量并行队列,然后看看您的指标会发生什么情况。另外,根据我的经验,队列的重新声明(幂等)不会导致任何明显的负载峰值。更重要的是观察建立新连接/渠道的速度。您还可以在每个服务器的基础上非常有效地限制队列创建的速率(请参阅我对第一个问题的回答),所以我认为如果您正确实现这一点,您将不需要很长时间担心这个问题。不过,RabbitMQ 的性能是否会因存在的队列数量(而不是声明率)而受到影响,这将是另一件需要进行基准测试的事情。

问:你可以因为行为不当而踢掉客户吗?留言费率?

答:是的,虽然设置起来有点棘手,但至少可以通过一种优雅的方式来完成。您有两个选择:

选项一:您的建议:按照您所做的那样跟踪服务器上的消息速率,并据此“踢”客户端。如果您有多个服务器,这会带来协调问题,并且需要编写位于消息接收循环中的代码,并且在 RabbitMQ 实际将消息传递给服务器的使用者之前不会发生故障。这些都是重大缺点。

选项二:使用最大长度和死信交换来构建“踢坏客户端”代理。RabbitMQ 队列的长度限制告诉队列系统“如果队列中的消息多于 X,则丢弃它们或将它们发送到死信交换(如果已配置)” 死信交换允许您将大于长度(或满足其他条件)的消息发送到特定队列/交换。以下是如何组合这些来检测发布消息过快(快于服务器消耗消息的速度)的客户端并踢掉客户端:

  1. 每个客户端都声明其主$clientID_to_server队列的最大长度为某个数字,X除非客户端“超过”服务器,否则永远不应在队列中建立该队列。该队列有一个死信主题交换ratelimit或某个常量名称。
  2. 每个客户端声明/拥有一个名为 的队列$clientID_overwhelm,最大长度为 1。该队列绑定到ratelimit路由键为 的交换机$clientID_to_server。这意味着当消息$clientID_to_server以太快的速度发布到队列而服务器无法跟上时,消息将被路由到$clientID_overwhelm,但只会保留一条消息(因此您不会填满 RabbitMQ,并且只会存储X+1每个客户端的消息)。
  3. 您启动一个简单的代理/服务,它发现(例如通过 RabbitMQ 管理 API)所有连接的客户端 ID,并使用(仅使用一个连接)所有队列中的内容*_overwhelm。每当它在该连接上收到消息时,它就会从该消息的路由键中获取客户端 ID,然后踢掉该客户端(通过在应用程序中执行带外操作;删除该客户端和$clientID_to_server队列$clientID_overwhelm,从而强制客户端下次尝试执行任何操作时都会出错;或者通过/connectionsRabbitMQ 管理 API 中的端点关闭该客户端与 RabbitMQ 的连接——这是相当侵入性的,只有在您确实需要时才应该这样做)。该服务应该非常容易编写,因为除了 RabbitMQ 之外,它不需要与系统的任何其他部分协调状态。不过,使用此解决方案,您会丢失一些来自行为不当的客户端的消息:如果您需要保留所有消息,请删除淹没队列的最大长度限制(并冒填满 RabbitMQ 的风险)。

使用这种方法,您可以根据 RabbitMQ检测垃圾邮件客户端,而不仅仅是根据您的服务器检测它们。您可以通过向客户端发送的消息添加每条消息的 TTL来扩展它,如果消息在队列中停留超过一定时间,则触发死信踢行为——这将改变伪-速率限制从“当服务器消费者落后于消息计数时”到“当服务器消费者落后于消息传递时间戳时”。

问:为什么每次运行时都会重新传送消息?如何删除它们?

答:使用确认或 noack(但可能是确认)。在“接收”中获取消息只会将其拉入您的消费者,但不会将其从队列中弹出。这就像一个数据库事务:要最终弹出它,您必须在收到它后确认它。或者,您可以以“noack”模式启动消费者,这将导致接收行为按照您假设的方式工作。但是,请注意,noack 模式会带来很大的权衡:由于 RabbitMQ 正在带外向您的消费者传递消息(基本上:即使您的服务器被锁定或睡眠,如果它已发出 ,consumerabbit 也会向其推送消息) ),如果您在 noack 模式下消费,当 RabbitMQ 将这些消息推送到服务器时,这些消息将被永久删除,因此,如果服务器在耗尽其“本地队列”和任何待接收消息之前崩溃或关闭,这些消息将丢失永远。如果不丢失消息很重要,请务必小心。