我正在使用RabbitMQ生产者向消费者发送长时间运行的任务(30分钟+).问题是,当关闭服务器的连接并且未确认的任务被重新排队时,消费者仍在处理任务.
从研究中我了解到心跳或增加的连接超时 可以用来解决这个问题.这两种解决方案在尝试时都会引发错误.在阅读类似帖子的答案时,我还了解到,自发布答案后,RabbitMQ已经实施了许多更改(例如,默认心跳超时已从RabbitMQ 3.5.5之前的580更改为60).
指定心跳和阻止的连接超时时:
credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
Run Code Online (Sandbox Code Playgroud)
显示以下错误:
TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'
Run Code Online (Sandbox Code Playgroud)
heartbeat_interval=1000
在连接参数中指定时,会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'
同样,socket_timeout = 1000
显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'
我在Ubuntu 14.04上运行RabbitMQ 3.6.1,pika 0.10.0和python 2.7.
我已经阅读了类似问题的答案
寻找一些代码示例来解决这个问题: -
想编写一些代码(Python或Javascript)作为RabbitMQ队列的订户,以便在收到消息时它会通过websockets将消息广播到任何连接的客户端.
我看过Autobahn和node.js(使用" amqp "和" ws "),但无法根据需要开始工作.这是使用node.js的javascript中的服务器代码: -
var amqp = require('amqp');
var WebSocketServer = require('ws').Server
var connection = amqp.createConnection({host: 'localhost'});
var wss = new WebSocketServer({port:8000});
wss.on('connection',function(ws){
ws.on('open', function() {
console.log('connected');
ws.send(Date.now().toString());
});
ws.on('message',function(message){
console.log('Received: %s',message);
ws.send(Date.now().toString());
});
});
connection.on('ready', function(){
connection.queue('MYQUEUE', {durable:true,autoDelete:false},function(queue){
console.log(' [*] Waiting for messages. To exit press CTRL+C')
queue.subscribe(function(msg){
console.log(" [x] Received from MYQUEUE %s",msg.data.toString('utf-8'));
payload = msg.data.toString('utf-8');
// HOW DOES THIS NOW GET SENT VIA WEBSOCKETS ??
});
});
});
Run Code Online (Sandbox Code Playgroud)
使用此代码,我可以成功订阅Rabbit中的队列并接收发送到队列的所有消息.同样,我可以将websocket客户端(例如浏览器)连接到服务器并发送/接收消息.但是......如何在指定的位置发送Rabbit队列消息的有效负载作为websocket消息("现在如何通过WEBSOCKET获取")?我认为这与陷入错误的回调有关,或者他们需要以某种方式嵌套......?
或者,如果这可以通过Python(通过Autobahn和pika)更容易完成,那将是很好的.
谢谢 !
我在 RabbitMQ 中有一个任务队列,其中有多个生产者 (12) 和一个消费者,用于 webapp 中的繁重任务。当我运行消费者时,它会在出现此错误之前开始将一些消息出列:
Traceback (most recent call last):
File "jobs.py", line 42, in <module> jobs[job](config)
File "/home/ec2-user/project/queue.py", line 100, in init_queue
channel.start_consuming()
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events
self._flush_output(common_terminator)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "error(104, 'Connection reset by peer')")
Run Code Online (Sandbox Code Playgroud)
生产者代码是:
message = {'image_url': image_url, 'image_name': image_name, 'notes': notes}
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks_queue')
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))
connection.close()
Run Code Online (Sandbox Code Playgroud)
唯一的消费者代码(那个是冲突的):
def callback(self, ch, method, properties, …
Run Code Online (Sandbox Code Playgroud) 只想知道worker.py
文件中的参数名称含义:
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
Run Code Online (Sandbox Code Playgroud)
ch,方法,属性意味着什么?
在使用 pika forrabbitmq 的 Flask 应用程序中观察以下错误,
使用pika版本1.1.0rabbitmq版本3.8.5
注意:配置的心跳值是900(由于长时间运行操作而设置了较高的值,所以我必须保持pika和rabbitmq之间的连接处于活动状态)
一段时间后发生连接丢失,我必须重新启动应用程序才能再次连接rabbitmq。有人能指出我到底出了什么问题的正确方向吗!
ERROR:pika.adapters.utils.io_services_utils:_AsyncBaseTransport._consume() failed, aborting connection: error=ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None); sock=<socket.socket fd=556, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('0.0.0.0', 65083), raddr=('0.0.0.0', 5672)>; Caller's stack:
Traceback (most recent call last):
File "pika\adapters\utils\io_services_utils.py", line 1041, in _on_socket_readable
File "pika\adapters\utils\io_services_utils.py", line 791, in _consume
File "pika\adapters\utils\io_services_utils.py", line 79, in retry_sigint_wrap
File "pika\adapters\utils\io_services_utils.py", line 846, in _sigint_safe_recv
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host …
Run Code Online (Sandbox Code Playgroud) 我有一个兔子 mq 在机器上运行
我在初始化 RPC 连接时收到随机超时
/usr/lib/python2.6/site-packages/pika-0.9.5-py2.6.egg/pika/adapters/blocking_connection.py
问题是超时不一致并且不时发生。
当手动测试这个问题并从同一台机器上运行 blocks_connection.py 1000 次时,它失败了,不会产生超时。
这是我失败时得到的错误:
2013-04-23 08:24:23,396 runtest-trigger.24397 24397 DEBUG producer_rabbit initiate_rpc_connection Connecting to RabbitMQ RPC queue rpcqueue_java on host: auto-db1
2013-04-23 08:24:25,350 runtest-trigger.24397 24397 ERROR testrunner go Run 1354: cought exception: timed out
Traceback (most recent call last):
File "/testrunner.py", line 193, in go
self.set_runparams(jobid)
File "/testrunner.py", line 483, in set_runparams
self.runparams.producers_testrun = self.initialize_producers_testrun(self.runparams)
File "/basehandler.py", line 114, …
Run Code Online (Sandbox Code Playgroud) 我在 kubernetes 集群中使用 pika 并使用队列中的消息,这会触发在新线程中启动函数。然而 RabbitMQ 似乎崩溃了,这些是我迄今为止找到的最好的日志:
2020-12-23 10:39:10,906] WARNING - WRITE indicated on fd=9, but writer callback is None; events=0b100 {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/selector_ioloop_adapter.py:393}
(repeats to a total of n=38 times)
2020-12-23 10:39:10,908] ERROR - _AsyncBaseTransport._produce() failed, aborting connection: error=IndexError('pop from an empty deque'); sock=<socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 44892), raddr=('192.168.101.201', 5672)>; Caller's stack:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable
self._produce()
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 822, in _produce
chunk = self._tx_buffers.popleft()
IndexError: pop from an empty deque …
Run Code Online (Sandbox Code Playgroud) 我安装了在mac上运行python的pika
sudo pip install pika==0.9.8
Run Code Online (Sandbox Code Playgroud)
我确定已安装,这是尝试重新安装时的响应,
Requirement already satisfied (use --upgrade to upgrade): pika==0.9.8 in /usr/local/lib/python2.7/site-packages
Run Code Online (Sandbox Code Playgroud)
但是当我在*.py上导入鼠兔时,我得到了,
ImportError:没有名为pika的模块
我的python版本
python --version
Python 2.7.5
Run Code Online (Sandbox Code Playgroud)
如何使这个模块被Python识别?谢谢!
编辑1
我这样称之为导入
import pika
Run Code Online (Sandbox Code Playgroud)
这给出了错误,我可以用这个临时修复,
export PYTHONPATH=$PYTHONPATH:/usr/local/lib/python2.7/site-packages
Run Code Online (Sandbox Code Playgroud)
那么如何使路径永久化?谢谢!
我在连接到 RabbitMQ 实例时遇到问题(这是我第一次这样做)。我已经在 AWS 上启动了一个,并获得了我可以访问的管理面板的访问权限。
我正在尝试使用以下代码连接到 python/pika 中的 RabbitMQ 服务器:
import pika
import logging
logging.basicConfig(level=logging.DEBUG)
credentials = pika.PlainCredentials('*******', '**********')
parameters = pika.ConnectionParameters(host='a-25c34e4d-a3eb-32de-abfg-l95d931afc72f.mq.us-west-1.amazonaws.com',
port=5671,
virtual_host='/',
credentials=credentials,
)
connection = pika.BlockingConnection(parameters)
Run Code Online (Sandbox Code Playgroud)
pika.exceptions.IncompatibleProtocolError: StreamLostError: ("Stream connection lost: ConnectionResetError(54, 'Connection reset by peer')",)
当我运行上面的代码时我得到了。
通过stackoverflow搜索并发布这个问题,因为没有解决方案对我有用,我的问题可能与其他问题不同。
我正在编写一个脚本,它从rabbitMQ 队列中获取一篇文章并处理该文章以计算单词并从中提取关键字并将其转储到数据库中。我的脚本工作正常,但执行一段时间后,我收到此异常
(-1, "ConnectionResetError(104, 'Connection reset by peer')")
我不知道为什么我会得到这个。我已经尝试了很多在 stackover 流上可用的解决方案,但没有一个对我有用。我写了我的脚本并以两种不同的方式进行了尝试。两者都工作正常,但一段时间后发生相同的异常。
这是我的第一个代码:
def app_main():
global channel, results, speedvars
Logger.log_message('Starting app main')
# Edit 4
def pika_connect():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In pika connect")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
Logger.log_message('Starting loop')
channel.start_consuming()
#########
speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()
sender = ResultsSender(results, speedvars)
sender.start()
# Edit 5 starting 10 threads to listen to pika
for th in range(qthreads):
Logger.log_message('Starting thread: …
Run Code Online (Sandbox Code Playgroud) 以下代码是我用来计算消费者数量的代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='IP ADDRESS'))
channel = connection.channel()
this=channel.queue_declare(queue="Queue_name",passive=True)
print this.method.consumer_count
Run Code Online (Sandbox Code Playgroud)
现在我获得的数量是活跃消费者的数量。但是,当消费者从队列中消费时,此计数将打印为零。现在我需要从队列中消费的消费者总数。这出现了 RabbitMQ 管理(作为消费者:0 活动 25 总计)
有没有办法在队列中有消息时获取从队列中消费的消费者总数的计数?
先感谢您
我有 2 个队列,比如说 q1 和 q2,它们对应于具有绑定密钥 b1 和 b2 的 e1 和 e2 交换。我想并行运行消费者函数,比如 c1 和 c2,它们将分别监听 q1 和 q2。我尝试了以下方法:
def c1():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
channel = connection.channel()
channel.exchange_declare(exchange='e1', durable='true',
type='topic')
result = channel.queue_declare(durable='false', queue='q1')
queue_name = result.method.queue
binding_key = "b1"
channel.queue_bind(exchange='e1',
queue=queue_name,
routing_key=binding_key)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
channel.start_consuming()
def c2():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
channel = connection.channel()
channel.exchange_declare(exchange='e2', durable='true',
type='topic')
result = channel.queue_declare(durable='false', queue='q2')
queue_name = result.method.queue
binding_key = "b2"
channel.queue_bind(exchange=e1,
queue=queue_name,
routing_key=binding_key)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
channel.start_consuming()
if __name__ == '__main__':
c1()
c2()
Run Code Online (Sandbox Code Playgroud)
但是,它只监听 c1 …
我有一个使用rabbitmq代理的客户端服务器应用程序.客户端连接到rabbitmq并将消息发送到服务器.在某些时候,如果服务器决定不应该将此客户端连接到rabbitmq,我希望能够强制断开客户端与rabbitmq边界的连接.请注意,在我的情况下,我不想发送消息给客户端断开连接,在服务器端我想强制断开这个客户端与rabbitmq.
找不到api这样做.任何帮助都是适当的.
python-pika ×13
rabbitmq ×12
pika ×10
python ×7
amqp ×1
node-amqp ×1
node.js ×1
python-2.7 ×1
python-3.x ×1
rpc ×1
websocket ×1