我已经使用RabbitMQ服务器并在立即字段设置为true时发布消息,我尝试发送50,000条消息并使用rabbitmqctl list_queues,我看到队列中的消息数量为零.然后我将立即标志更改为false并再次尝试发送50,000条消息然后使用rabbitmqctl list_queues我看到总共100,000条消息在队列中.(直到现在还没有消费者存在)
之后我开始使用消费者并且它消耗了所有100,000条消息.任何人都可以帮助我理解立即位字段和这种行为.另外,我无法理解强制位字段的概念.
提前感谢.
Gurpreet Singh.
我目前正在使用ActiveMQ来满足我的消息传递需求; 除了几个db故障之外,它运行良好.但是,我至少考虑尝试使用RabbitMQ.但在此之前,我想了解以下内容:
JmsTemplate
并使用DefaultMessageListener
bean将队列连接到各自的处理程序.我可以用RabbitMQ做同样的事情吗?一点背景.
非常大的单片Django应用程序.所有组件都使用相同的数据库.我们需要分离服务,以便我们可以独立升级系统的某些部分而不影响其余部分.
我们使用RabbitMQ作为Celery的经纪人.
现在我们有两个选择:
我的团队倾向于HTTP,因为这是他们熟悉的,但我认为使用RPC而不是AMQP的优势远大于它.
AMQP为我们提供了轻松添加负载平衡和高可用性以及保证消息传递的功能.
而使用HTTP我们必须创建客户端HTTP包装器以使用REST接口,我们必须放入负载平衡器并设置该基础结构以便具有HA等.
使用AMQP,我可以生成另一个服务实例,它将连接到与其他实例相同的队列以及bam,HA和负载平衡.
我对AMQP的看法是否遗漏了什么?
我想知道RabbitMQ在单个服务器上可以处理多少个最大队列?
它取决于RAM吗?它取决于erlang进程吗?
似乎我让我的Rabbitmq服务器运行的时间越长,我对未确认消息的麻烦就越多.我很乐意将它们重新排列.实际上似乎有一个amqp命令来执行此操作,但它仅适用于您的连接使用的通道.我制作了一个小的鼠兔脚本,至少尝试一下,但是我要么缺少一些东西,要么就是这样做了(用rabbitmqctl怎么样?)
import pika
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
credentials=credentials, virtual_host='***')
def handle_delivery(body):
"""Called when we receive a message from RabbitMQ"""
print body
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
connection.channel(on_channel_open)
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.basic_recover(callback=handle_delivery,requeue=True)
try:
connection = pika.SelectConnection(parameters=parameters,\
on_open_callback=on_connected)
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on …
Run Code Online (Sandbox Code Playgroud) 在Facebook应用程序和云计算时代,我正在重新思考大型多人游戏.
假设我要在现有的开放协议之上构建一些东西,我想为1,000,000个并发播放器提供服务,只是为了解决问题.
假设每个玩家都有一个传入的消息队列(用于聊天和诸如此类),平均有一个传入的消息队列(公会,区域,实例,拍卖......),因此我们有2,000,000个队列.玩家将一次收听1-10个队列.每个队列平均每秒可能有1条消息,但某些队列将具有更高的速率和更多的侦听器(例如,级别实例的"实体位置"队列).假设系统排队延迟不超过100毫秒,这对于温和的动作导向游戏来说是可以的(但不是像Quake或Unreal Tournament这样的游戏).
从其他系统,我知道在单个1U或刀片盒上为10,000个用户提供服务是一个合理的期望(假设没有其他任何昂贵的东西,比如物理模拟或诸如此类的东西).
因此,使用交叉开关集群系统,客户端连接到连接网关,然后连接到消息队列服务器,我们每个网关有100个用户,有100个网关机器,每个队列服务器有100个消息队列,有100个队列机器.再次,仅适用于一般范围.每台MQ机器上的连接数量很小:大约100,与每个网关通信.网关上的连接数量会更高:客户端为10,100,连接所有队列服务器.(除此之外,为游戏世界模拟服务器添加一些连接或诸如此类的东西,但我现在试图将它保持分离)
如果我不想从头开始构建这个,我必须使用一些存在的消息传递和/或排队基础结构.我能找到的两个开放协议是AMQP和XMPP.XMPP的预期用途更像是这个游戏系统所需要的,但开销非常明显(XML,加上冗长的状态数据,以及必须在顶部构建的各种其他通道).AMQP的实际数据模型更接近我上面描述的内容,但所有用户似乎都是大型企业级公司,工作负载似乎与工作流程相关,而不是与实时游戏更新相关.
有没有人可以分享这些技术或其实现的日常经验?
在RabbitMQ/AMQP Java客户端中,您可以创建一个AMQP.BasicProperties.Builder
,并将其用于build()
实例AMQP.BasicProperties
.然后,可以将此构建的属性实例用于各种重要事项.此构建器类上有许多"构建器"样式方法:
BasicProperties.Builder propsBuilder = new BasicProperties.Builder();
propsBuilder
.appId(???)
.clusterId(???)
.contentEncoding(???)
.contentType(???)
.correlationId(???)
.deliveryMode(2)
.expiration(???)
.headers(???)
.messageId(???)
.priority(???)
.replyTo(???)
.timestamp(???)
.type(???)
.userId(???);
Run Code Online (Sandbox Code Playgroud)
我正在寻找这些builer方法帮助"建立"的字段,最重要的是,每个字段存在哪些有效值.例如,什么是a clusterId
,它的有效值是什么?什么是type
有效值?等等.
我整个上午都在淘洗:
在所有这些文档,我无法找到明确的定义(除了一些含糊的解释是什么priority
,contentEncoding
以及deliveryMode
是)什么的每个领域都,以及他们的有效值.有人知道吗?更重要的是,有人知道这些甚至记录在哪里吗?提前致谢!
有谁知道是否有办法从客户端应用程序检查RabbitMQ队列中的消息数量?
我正在使用.NET客户端库.
作为学习RabbitMQ和python的一种方法,我正在开发一个项目,允许我在许多计算机之间分发h264编码.完成了基础知识,我有一个在Linux或Mac上运行的守护程序,它连接到队列,接受作业并使用HandBrakeCLI对它们进行编码,并在编码完成后确认消息.我还构建了一个简单的工具来将项目推入队列.
现在我想扩展将项目推入队列的工具的功能,以便我可以查看队列中的内容.我知道能够看到队列中有多少项目,但我希望能够获得实际的消息,这样我就可以显示正在等待编码的电影或电视节目.我们的想法是,队列管理器将在作业完成后从编码器客户端接收消息,然后刷新队列列表.
我知道有一种令人费解的方法可以保持队列管理器的列表与实际工作队列同步,但我希望这是"持久的",因为我应该能够关闭队列管理器并稍后重新打开以查看队列.
我在OS X上安装了RabbitMQ服务器,并在命令行启动它.现在,我不应该如何阻止它运行?我做了之后:
sudo rabbitmq-server -detached
Run Code Online (Sandbox Code Playgroud)
我明白了:
Activating RabbitMQ plugins ...
0 plugins activated:
Run Code Online (Sandbox Code Playgroud)
就是这样.我该如何正确关闭它?在文档中,它提到了使用rabbitmqctl(1)
,但我不清楚这意味着什么.谢谢.
编辑:根据下面的评论,这是我运行的sudo rabbitmqctl stop
:
(project_env)mlstr-1:Package mlstr$ sudo rabbitmqctl stop
Password:
Stopping and halting node rabbit@h002 ...
Error: unable to connect to node rabbit@h002: nodedown
DIAGNOSTICS
===========
nodes in question: [rabbit@h002]
hosts, their running nodes and ports:
- h002: [{rabbit,62428},{rabbitmqctl7069,64735}]
current node details:
- node name: rabbitmqctl7069@h002
- home dir: /opt/local/var/lib/rabbitmq
- cookie hash: q7VU0JjCd0VG7jOEF9Hf/g==
Run Code Online (Sandbox Code Playgroud)
为什么还有"当前节点"?我没有运行任何客户端程序,只有RabbitMQ服务器,这是否意味着服务器仍在运行?