在我尽力研究实现消息队列服务器的最佳方法后,我问了这个问题.为什么操作系统会限制进程和全局系统可以拥有的打开文件描述符的数量?我当前的服务器实现使用zeromq,并为每个连接的websocket客户端打开一个订阅者套接字.显然,单个进程只能将客户端处理到fds的极限.当我研究这个主题时,我发现了很多关于如何将系统限制提高到高达64k fds的信息,但它从未提到它如何影响系统性能以及为什么它开始时为1k或更低?我目前的方法是尝试使用自己的循环中的协程向所有客户端发送消息,以及所有客户端及其订阅通道的映射.但我只是想听听关于文件描述符限制以及它们如何影响试图在每个客户端级别使用持久连接的应用程序的可靠答案?
operating-system file-descriptor message-queue zeromq websocket
所以我一直试图了解消息队列和消息代理之间的区别,以及为什么应该使用一个而不是另一个。
所以据我了解。MESSAGE QUEUE 有助于进程间通信,但它基本上仅限于仅允许 2 个应用程序之间的通信?我问这个是因为例如 MSMQ(如果我的理解是正确的)只将消息存储在队列中,直到它被第一个消费者处理,之后它会自动将它从队列中删除。这样对吗 ?
现在 MESSAGE BROKERS 是 MESSAGE QUEUE 的某种扩展?因为它们提供了一种发布者 - 订阅者(S)关系的机制,就像观察者一样?
我的理解正确吗?如果是这样,两者之间还有其他区别吗?另外,您为什么要在 MESSAGE BROKER 上使用 MESSAGE QUEUE,因为您很可能会使用分布式系统,该系统肯定会由多个服务组成。
谢谢。
我注意到的一件事是,如果我不洁净地杀死一个消费者(模仿一个崩溃的程序),服务器会认为这个消费者仍然存在很长时间.结果是每个其他消息都将被忽略.
例如,如果您杀死消费者1次并重新连接,则将忽略1/2消息.如果您杀死另一个消费者,则将忽略2/3消息.如果你杀了第3个,那么将忽略3/4个消息,依此类推.
我试过打开确认,但它似乎没有帮助.我找到的唯一解决方案是手动停止服务器并重置它.
有没有更好的办法?
如何重新创建此方案
这是我的设置:
在我的settings.py文件中
BROKER_BACKEND = "djkombu.transport.DatabaseTransport"
Run Code Online (Sandbox Code Playgroud)
即我只是使用数据库来排队任务.
现在我的问题是:我有一个用户启动的任务,可能需要几分钟才能完成.我希望每个用户只运行一次任务,我会将任务的结果缓存在一个临时文件中,这样如果用户再次启动任务,我只需返回缓存的文件.我的视图函数中的代码如下所示:
task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)
if result.state == celery.states.PENDING:
# The next line makes a duplicate task if the user rapidly refreshes the page
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
if cached_file_still_exists():
return get_cached_file()
else:
result.forget()
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
Run Code Online (Sandbox Code Playgroud)
这段代码几乎可行.但是当用户快速重新加载页面时,我遇到了问题.在任务排队和最终将任务从队列中拉出并提供给工作人员之间有1-3秒的延迟.在此期间,任务的状态仍为PENDING,这会导致视图逻辑启动重复任务.
我需要的是一些方法来判断任务是否已经提交到队列中,所以我最终不会提交两次.在芹菜中有这样做的标准方法吗?
我现在已经在一些设置中使用RabbitMQ,我无法摆脱必须有更容易设置的东西的感觉.尽管它很方便,但很难证明MQ只能处理每天处理几千条消息的解决方案,这仅仅是因为维护RabbitMQ是如此之多.
有没有人通过简单的安装和维护过程了解AMQP实施?
对于那些不了解它的人,RabbitMQ是用Erlang编写的AMQP实现.它应该是非常稳定的,但只有你对Erlang有足够的了解才能避免它的问题.无论是内存限制还是主机名的更改,总是需要深入了解它.
我想知道它们之间有什么区别.他们描述的是同一件事吗?
Google App Engine服务任务队列是Message Queue的实现吗?
我的Java应用程序向RabbitMQ交换发送消息,然后交换重定向消息到绑定队列.我在RabbitMQ中使用Springframework AMQP java插件.
问题:消息进入队列,但它保持"未确认"状态,它永远不会变为"就绪".
可能是什么原因?
我正在试图澄清亚马逊的SQS死信队列究竟在做什么.
据http://aws.typepad.com/aws/2014/01/amazon-sqs-new-dead-letter-queue.html
死信队列 - SQS队列的ARN(亚马逊资源名称),它将接收消费者收到最大数量后未成功处理的消息.
这听起来不像Poision Queue吗?关键的区别在于消费者确实收到了消息.当信息可能正常,但无法传递时,可能是由于服务中断而导致死信.http://www.eaipatterns.com/DeadLetterChannel.html
其中,因为这听起来像被成功地接收到该消息多次,但处理消息失败,这是我理解是有害消息队列的含义.
消息总线与队列
死信模式在普通旧队列的上下文中有不同的含义吗?由于SQS只是一个队列,而不是消息总线,因此它不负责传递消息.相反,它等待拾取(请求)消息.因此传统的死信模式并不真正适用,因为没有消息总线尝试传递消息而无法找到收件人.
SQS可以像消息总线一样吗?
是否有办法通过SQS设置通道和侦听器,而不是显式轮询来自队列的消息?
现在使用Azure Service结构,是否还有一个用例也可以使用单独的队列解决方案,例如Windows Service Bus?缺点可能是一个新的单点故障,但有上升空间吗?队列可以添加一些缓冲,但另一方面,Service Fabric应该能够很好地扩展并提供有状态功能,因此不需要队列缓冲区吗?
message-queue ×10
amqp ×4
rabbitmq ×4
.net ×1
amazon-sqs ×1
architecture ×1
celery ×1
django ×1
java ×1
message-bus ×1
messaging ×1
mq ×1
python ×1
websocket ×1
zeromq ×1