使用RabbitMQ的工作池和多租户队列

bak*_*san 13 worker amqp rabbitmq

我致力于一个基于多租户云的应用程序的Web应用程序(许多客户端,每个客户端都有自己独立的"环境",但都在共享的硬件集上),我们正在引入用户批量处理的能力为以后的处理工作.批量工作的类型实际上并不重要,只有足够的数量才能在没有工作队列的情况下进行,这实际上并不实际.我们选择RabbitMQ作为底层队列框架.

因为我们是一个多租户应用程序,所以我们不一定希望客户端能够为另一个客户端造成冗长的队列处理时间,因此我们浮出水面的一个想法是在每个客户端基础上创建一个队列并拥有一个共享工作池指向所有客户端队列.问题在于,就我所能想到的最好,工人直接绑定到特定的队列,而不是交换.在我们理想的世界中,我们的客户端队列仍将被处理,而没有一个客户端阻止另一个客户端队列,我们​​可以通过启动更多工作人员或关闭空闲工作人员来根据需要增长或缩小共享工作者池.将工作人员绑定到特定队列会使我们在实际意义上无法做到这一点,因为我们经常会有很多工作人员闲置在没有活动的队列上.

是否有相对直接的目标来实现这一目标?我是RabbitMQ的新手,并没有真正完成我们所追求的目标.我们也不想编写一个非常复杂的多线程消费者应用程序,这是我们可能负担不起的开发时间和测试时间的时间.我们的堆栈是以Windows/.Net/C#为基础的,如果它是germaine,但我不认为这应该对手头的问题有重大影响.

Las*_*sus 6

您可以查看优先级队列实现(最初提出这个问题时尚未实现): https: //www.rabbitmq.com/priority.html

如果这对您不起作用,您可以尝试其他一些技巧来实现您想要的效果(应该适用于旧版本的 RabbitMQ):

您可以将 100 个队列绑定到主题交换,并将路由键设置为用户 ID % 100 的哈希值,即每个任务将具有 1 到 100 之间的键,并且同一用户的任务将具有相同的键。每个队列都与 1 到 100 之间的唯一模式绑定。现在,您拥有一组工作人员,它们以随机队列号开始,然后在每个作业后递增该队列号,再次 % 100 在队列 100 后循环回到队列 1。

现在,您的工作人员队列可以并行处理最多 100 个唯一用户,或者如果没有其他工作要做,所有工作人员都可以专注于单个用户。如果工作人员需要在每个作业之间循环遍历所有 100 个队列,那么在只有单个用户在单个队列上有大量作业的情况下,每个作业之间自然会产生一些开销。减少队列数量是解决此问题的一种方法。您还可以让每个工作线程保持与每个队列的连接,并使用每个队列中最多一条未确认的消息。如果未确认的消息超时设置得足够高,工作人员就可以更快地循环处理内存中的待处理消息。

或者,您可以创建两个交换器,每个交换器都有一个绑定队列。所有工作都进入第一个交换器和队列,由工作池消耗。如果一个工作单元花费的时间太长,工作人员可以取消它并将其推送到第二个队列。当第一个队列上没有任何内容时,工作人员仅处理第二个队列。您可能还需要几个具有相反队列优先级的工作人员,以确保当有永无休止的短任务流到达时,长时间运行的任务仍然得到处理,以便最终始终处理用户批次。这不会真正将您的工作人员队列分布在所有任务中,但它会阻止一个用户的长时间运行任务,从而阻止您的工作人员为同一用户或另一个用户执行短期运行任务。它还假设您可以取消作业并稍后重新运行它,不会出现任何问题。这还意味着超时并需要以低优先级重新运行的任务将会浪费资源。除非你能提前识别快任务和慢任务

如果单个用户有 100 个缓慢任务,然后另一个用户发布一批任务,则第一个建议的 100 个队列也可能存在问题。在其中一项缓慢任务完成之前,不会查看这些任务。如果事实证明这是一个合理的问题,您可以将这两种解决方案结合起来。


Dav*_*sot 1

您可以让您的工作人员池全部使用相同的唯一队列。然后,工作将在它们之间分配,您将能够扩大/缩小您的池,以增加/减少您的工作处理能力。

  • 我不是在问将多个工作人员分配到同一个队列,而是在问相反的情况。我希望有限的工作人员池能够从大量(我们称之为~500)队列中进行消费。 (2认同)
  • 我已经亲自尝试过这种方法,但它并不完美:很难找到合适的启发式方法来处理所有这些队列。您是否首先处理最满的队列?或者那些有较旧消息的人?在这两种情况下,您都脱离了 AMQP 协议,并且必须开始处理 Rabbit 管理 API。然后你会想:让我们拥有与工作人员相同数量的队列,然后在 500 个 Q 和工作人员队列之间添加一些一致的哈希映射。然后你意识到你所需要的只是一个队列和n个在其上竞争的工人。 (2认同)