我现在已经在一些设置中使用RabbitMQ,我无法摆脱必须有更容易设置的东西的感觉.尽管它很方便,但很难证明MQ只能处理每天处理几千条消息的解决方案,这仅仅是因为维护RabbitMQ是如此之多.
有没有人通过简单的安装和维护过程了解AMQP实施?
对于那些不了解它的人,RabbitMQ是用Erlang编写的AMQP实现.它应该是非常稳定的,但只有你对Erlang有足够的了解才能避免它的问题.无论是内存限制还是主机名的更改,总是需要深入了解它.
这可能是一个非常简单的答案,但我没有在MassTransit文档或论坛中看到明显的解决方案.
如果有一些消息已经移到RabbitMQ中的错误队列中,那么将它们带回处理队列的最佳机制是什么?此外,是否有任何内置的记录,为什么他们首先在那里移动?
我试图使用Rabbitmq服务器由于某种原因连接突然关闭,即使我传递了正确的用户名和密码.
Rabbitmq服务器在端口5672上运行,并通过端口5672 telnet到我的服务器说它运行正常.
我在CentOS安装了rabbitmq服务器,我的rabbitmq服务器日志如下:
=INFO REPORT==== 19-Dec-2012::06:25:44 ===
accepted TCP connection on [::]:5672 from <host>:42048
=INFO REPORT==== 19-Dec-2012::06:25:44 ===
starting TCP connection <0.357.0> from <host>:42048
=WARNING REPORT==== 19-Dec-2012::06:25:44 ===
exception on TCP connection <0.357.0> from <host>:42048
connection_closed_abruptly
=INFO REPORT==== 19-Dec-2012::06:25:44 ===
closing TCP connection <0.357.0> from <host>:42048
Run Code Online (Sandbox Code Playgroud)
可能出现这种情况的原因可能是什么.
谢谢
我们使用RabbitMQ服务器在应用程序之间进行消息传递.我们需要为进入Rabbit服务器的所有amqp消息创建一个中央日志.我们的目的不是临时调试,而是可审计性.理想情况下,我可以先登录到指定的文件,然后再登录到Logly或Splunk等外部系统.
我已经探索过打开Firehose并使用跟踪插件,但是这些队列并不耐用.如果在启动日志记录后将新队列和交换添加到虚拟主机,我也不确定这些解决方案是否有效.这些工具似乎更适合我需要的临时调试.
我很想听听你的想法.在这一点上,我担心我必须设置一个网络监视器来拦截和记录消息,然后才能到达Rabbit.
我在Centos远程服务器上安装并设置了Rabbitmq.后来我创建了一个文件"rabbitmq.config"并添加了该行
[{rabbit,[{loopback_users,[]}]}]
然后重新启动rabbitmq服务器.再次尝试使用访客凭证从我的本地计算机登录rabbitmq管理Web界面,但获取
登录失败
错误消息.在Centos中清空Rabbitmq的环回用户设置的正确方法是什么.
我对码头工人,芹菜和兔子相对较新.
在我们的项目中,我们目前有以下设置:1个运行多个docker容器的物理主机:
1x rabbitmq:3管理容器
# pull image from docker hub and install
docker pull rabbitmq:3-management
# run docker image
docker run -d -e RABBITMQ_NODENAME=my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
Run Code Online (Sandbox Code Playgroud)
1x芹菜容器
# pull docker image from docker hub
docker pull celery
# run celery container
docker run --link some-rabbit:rabbit --name some-celery -d celery
Run Code Online (Sandbox Code Playgroud)
(还有一些容器,但它们不应该对问题做任何事情)
任务文件
为了更好地了解celery和rabbitmq,我在物理主机上创建了一个tasks.py文件:
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://guest:guest@172.17.0.81/')
@app.task(name='tasks.add')
def add(x, y):
return x + y
Run Code Online (Sandbox Code Playgroud)
实际上整个设置似乎工作得很好.所以当我在tasks.py所在的目录中打开一个python shell并运行时
>>> from tasks import …Run Code Online (Sandbox Code Playgroud) 有没有办法确定是否有任何任务丢失并重试?
我认为丢失的原因可能是调度程序错误或工作线程崩溃.
我打算重试它们,但我不确定如何确定哪些任务需要退役?
以及如何自动完成此过程?我可以使用自己的自定义调度程序来创建新任务吗?
编辑:我从文档中发现RabbitMQ从未松散任务,但是当工作线程在任务执行过程中崩溃时会发生什么?
我有一个python worker客户端,它会旋转10个worker,每个worker挂钩到RabbitMQ队列.有点像这样:
#!/usr/bin/python
worker_count=10
def mqworker(queue, configurer):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
channel = connection.channel()
channel.queue_declare(queue=qname, durable=True)
channel.basic_consume(callback,queue=qname,no_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
def callback(ch, method, properties, body):
doSomeWork();
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == '__main__':
for i in range(worker_count):
worker = multiprocessing.Process(target=mqworker)
worker.start()
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是,尽管在通道上设置了basic_qos,但是第一个启动的工作人员接受队列中的所有消息,而其他人则闲置在那里.我可以在rabbitmq界面中看到这一点,即使我设置worker_count为1并在队列中转储50条消息,所有50条消息进入"未确认"桶,而我预计1会成为未确认的,而另外49条将成为准备.
为什么这不起作用?
使用RabbitMQ发送消息时,您基本上有交换,队列和绑定.我理解他们的想法以及他们如何相互联系,但我不确定是谁设置了什么.
基本上,我的应用程序中有三个场景.
我想要实现的是一个将消息发送到队列的组件,并且应该有几个处理该队列中的项的工作进程.这对我来说似乎很容易.设置如下:
每当将消息发送到交换机时,它就会被传递到队列,并且工作进程会完成其任务.
一切都应该是耐用的.
那么谁设置了什么?在我看来:
对?
第二种情况完全不同.基本上,它是一个发布/子方案,其中每条消息都发送到每个当前正在侦听的客户端.如果客户端脱机,它不再接收消息,并且不会存储在任何地方.这意味着以下设置:
那么谁设置了什么?在我看来:
对?
基本上与方案2相同,但如果消费者离线,则不应丢失消息.在我看来,这不应该改变任何东西 - 对吗?
我想明确撤销芹菜的任务.这就是我目前的做法: -
from celery.task.control import revoke
revoke(task_id, terminate=True)
Run Code Online (Sandbox Code Playgroud)
task_id在哪里string(也尝试将其转换为UUID uuid.UUID(task_id).hex).
在上述过程之后,当我再次启动芹菜时,celery worker -A proj它仍会消耗相同的消息并开始处理它.为什么?
查看时flower,消息仍在代理部分中.如何删除邮件以使其无法再次使用?