我想在几个线程中使用进程消息但是在执行此代码时我遇到错误:
from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback
def doWork(body, args, channel):
r = random.random()
time.sleep(r * 10)
try:
channel.basic_ack(delivery_tag=args.delivery_tag)
except :
traceback.print_exc()
auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()
while True:
time.sleep(0.03)
try:
method_frame, header_frame, body = channel.basic_get(queue="test_queue")
if method_frame.NAME == 'Basic.GetEmpty':
continue
t = threading.Thread(target=doWork, …Run Code Online (Sandbox Code Playgroud) 我正在使用rabbitmq来处理我工作的企业应用程序的数据库之间的消息.作为流程的一部分,我正在尝试帮助自动化服务器的设置(例如脚本).在此过程中,我尝试使用rabbitmqadmin从命令提示符声明交换.我有2个不同的服务器在运行CentOS 5.x,CentOS 6.x并且两个都有相同的问题.
到目前为止,我宣布了一个管理员用户并设置了密码,然后我将其标记设置为管理员标记,然后我确保它具有对vhost的权限.之后,我尝试声明交换,指定用户和密码,它失败.
rabbitmqctl add_user administrator password
rabbitmqctl set_user_tags administrator administrator
rabbitmqctl set_permissions -p / administrator ".*" ".*" ".*"
rabbitmqctl add_vhost vhostFoo
rabbitmqctl set_permissions -p vhostFoo administrator ".*" ".*" ".*"
rabbitmqadmin -u administrator -p password declare exchange --vhost=vhostFoo name=exchangeNew type=direct
Run Code Online (Sandbox Code Playgroud)
对于CentOS 5.x盒子,我必须使用python26 rabbitmqadmin而不仅仅是rabbitmqadmin,但得到相同的结果.
最后一个命令导致:
*** Access refused: /api/exchanges/vhostFoo/exchangeNew
Run Code Online (Sandbox Code Playgroud)
我的CentOS 5.x和Cent 6.x盒子都在运行rabbitmq 3.3.5.关于我缺少什么的想法或者我如何获得有关错误的更多信息的想法(例如错误日志文件或获得更详细输出的方式)?
我正在尝试设计一种重播机制,使用户能够重放队列中的消息.我为包含多个队列和多个消费者的交换而设计的最佳设计是:
创建一个记录器服务,它将:
订阅者请求重播.

问题:
1.这有意义吗?
我发明了轮子吗?有兔子固有的解决方案吗?插入?
3.创建多个交易所是否被视为良好做法?
在此解决方案中,创建每个队列的交换以便发布相同的消息.
另一种解决方案:
1.为每个队列创建一个额外的队列"ReplayQueue".设置一个TTL(假设一个月).
2.每次用户请求重播时,让他从自己的ReplayQueue重放,而不需要进行重放.
这个解决方案有点问题,因为.
我string通过一个rabbitmq消息系统.发送之前,
我使用json.Marshal,转换结果string并通过rabbitmq发送.
我转换和发送的结构可以是:(更改了结构的名称和大小,但它无关紧要)
type Somthing1 struct{
Thing string `json:"thing"`
OtherThing int64 `json:"other_thing"`
}
Run Code Online (Sandbox Code Playgroud)
要么
type Somthing2 struct{
Croc int `json:"croc"`
Odile bool `json:"odile"`
}
Run Code Online (Sandbox Code Playgroud)
消息完美地作为a string打印并打印在另一侧(某些服务器)
到目前为止一切正常.现在我正在尝试将它们转换回结构并断言类型.
第一次尝试是:
func typeAssert(msg string) {
var input interface{}
json.Unmarshal([]byte(msg), &input)
switch input.(type){
case Somthing1:
job := Somthing1{}
job = input.(Somthing1)
queueResults(job)
case Somthing2:
stats := Somthing2{}
stats = input.(Somthing2)
queueStatsRes(stats)
default:
}
Run Code Online (Sandbox Code Playgroud)
这不起作用.在打印input解组后的类型时,我得到map[string]interface{}(?!?)
甚至比这更奇怪,地图键是我得到的字符串,地图值是空的.
我做了一些其他的尝试,如:
func typeAssert(msg string) {
var input interface{} …Run Code Online (Sandbox Code Playgroud) 我想纵向和横向扩展我的Node.js Socket应用程序,但我还没有找到一个复杂的解决方案.
我的应用程序有两个用例:
一方面,我读到我需要Redis 和socket.io-redis 这两种情况
另一方面,我观看了这个视频并阅读了这个SO答案,其中说Redis不可靠并且不能保证发布的消息会到达,所以你应该只用它来进行聚类/垂直缩放
使用ServiceBus的 Microsoft Azures解决方案是不可能的,因为我不想使用Azure.
该家伙建议使用RabbitMQ进行水平缩放,而不是Redis .
对于垂直扩展,还有socket.io-clusterhub,一个用于节点进程的IPC,但它似乎只适用于Socket.io <= v0.9.0
然后就是这个人,他已经实现了自己的方法,通过HTTP请求将消息传递给其他节点,这在某种程度上是有道理的.但是为什么HTTP请求你是否也可以在服务器之间建立直接套接字连接,同时将消息推送到所有服务器并克服从一个服务器到另一个服务器的延迟?
作为结论,我想也许我可以在EACH服务器上使用Redis,只是为了在多个进程上集群我的应用程序时交换消息,以及RabbitMQ作为S2S通信解决方案.
但是,每个服务器和另一个中央RabbitMQ有一个Redis似乎有点过分.
是否有任何已知的更短/更好的解决方案可以在两个方向上可靠地扩展Socket.io?
编辑:我曾尝试将单个Redis服务器用于多个Node.js服务器,其中每个服务器都通过粘性会话在所有核心上使用群集.虽然Clustering本身就像一个带有redis的魅力,但在使用多个服务器时似乎存在问题.消息不会到达其他节点.
我正在尝试让Django Celery工作者连接到RabbitMQ服务器,所有服务器都运行在同一主机上.
但是,当我运行时,manage.py celery worker我得到的是:
[2013-06-11 17:33:41,185: WARNING/MainProcess] celery@localhost has started.
[2013-06-11 17:33:44,192: ERROR/MainProcess] Consumer: Connection Error: Socket closed. Trying again in 2 seconds...
[2013-06-11 17:33:50,203: ERROR/MainProcess] Consumer: Connection Error: Socket closed. Trying again in 4 seconds...
[2013-06-11 17:34:03,214: ERROR/MainProcess] Consumer: Connection Error: Socket closed. Trying again in 6 seconds...
[2013-06-11 17:34:27,232: ERROR/MainProcess] Consumer: Connection Error: Socket closed. Trying again in 8 seconds...
Run Code Online (Sandbox Code Playgroud)
当我检查我的时候,/var/log/rabbitmq/rabbit@localhost.log我看到了几条消息:
=ERROR REPORT==== 11-Jun-2013::17:33:44 ===
exception on TCP connection <0.201.0> from 127.0.0.1:43461
{channel0_error,opening, …Run Code Online (Sandbox Code Playgroud) 我已经为RabbitMQ rabbitmq.config文件配置了新的端口号,即带有SSL的5671.
现在我要禁用默认端口,即5672.
配置文件如下: -
[
{rabbit, [
{ssl_listeners, [5671]},
{ssl_options, [{cacertfile,"/ay/app/xxx/softwares/rabbitmq_server-3.1.1/etc/ssl/cacert.pem"},
{certfile,"/ay/app/xxx/softwares/rabbitmq_server-3.1.1/etc/ssl/cert.pem"},
{keyfile,"/ay/app/xxx/softwares/rabbitmq_server-3.1.1/etc/ssl/key.pem"},
{verify,verify_peer},
{fail_if_no_peer_cert,false},
{ciphers,[{dhe_rsa,aes_256_cbc,sha},
{dhe_dss,aes_256_cbc,sha},
{rsa,aes_256_cbc,sha}]}
]
}
]}
].
Run Code Online (Sandbox Code Playgroud)
现在它在端口5671和5672上工作.但是我需要禁用端口5672.给出一些意见或建议.
提前致谢.
我在通过SocketIO从RabbitMQ向用户发送消息时遇到问题.
我有与SocketIO集成的 Flask应用程序.目前的用户流程似乎如此
问题是我无法设置RabbitMQ监听器,它通过SocketIO将消息转发给浏览器.每次我得到不同的错误.主要是连接关闭,或者我在应用程序上下文之外工作.
我尝试了很多方法,这是我的最后一个方法.
# callback
def mq_listen(uid):
rabbit = RabbitMQ()
def cb(ch, method, properties, body, mq=rabbit):
to_return = [0] # mutable
message = Message.load(body)
to_return[0] = message.get_message()
emit('report_part', {"data": to_return[0]})
rabbit.listen('results', callback=cb, id=uid)
# this is the page, which user reach
@blueprint.route('/report_result/<uid>', methods=['GET'])
def report_result(uid):
thread = threading.Thread(target=mq_listen, args=(uid,))
thread.start()
return render_template("property/report_result.html", socket_id=uid)
Run Code Online (Sandbox Code Playgroud)
其中rabbit.listen方法是抽象的:
def listen(self, queue_name, callback=None, id=None):
if callback is not None:
callback_function = callback
else:
callback_function = self.__callback
if id is None:
self.channel.queue_declare(queue=queue_name, durable=True) …Run Code Online (Sandbox Code Playgroud) 我对此做了大量的研究,我很惊讶我还没有找到一个好的答案.
我正在Heroku上运行一个大型应用程序,我有一些芹菜任务运行了很长时间的处理,并在任务结束时保存结果.每次我在Heroku上重新部署时,它都会发送SIGTERM(最终是SIGKILL)并杀死我的跑步工作者.我正在尝试找到一种方法让worker实例优雅地自行关闭并重新排队以便稍后处理,这样最终我们可以保存所需的结果而不是丢失排队的任务.
我找不到一种让工作人员正确地监听SIGTERM的方法.我得到的最接近的,在python manage.py celeryd直接运行时有效但在使用工头模拟Heroku时没有,如下:
@app.task(bind=True, max_retries=1)
def slow(self, x):
try:
for x in range(100):
print 'x: ' + unicode(x)
time.sleep(10)
except exceptions.MaxRetriesExceededError:
logger.error('whoa')
except (exceptions.WorkerShutdown, exceptions.WorkerTerminate) as exc:
logger.error(u'retrying, ' + unicode(exc))
raise self.retry(exc=exc, countdown=10)
except (KeyboardInterrupt, SystemExit) as exc:
print 'retrying'
raise self.retry(exc=exc, countdown=10)
else:
return x
finally:
logger.info('task ended!')
Run Code Online (Sandbox Code Playgroud)
当我开始在领班内运行芹菜任务并按Ctrl + C时,会发生以下情况:
^CSIGINT received
22:20:59 system | sending SIGTERM to all processes
22:20:59 web.1 | exited with code 0
22:21:04 system | sending …Run Code Online (Sandbox Code Playgroud) 我正在按照本指南学习如何使用spring-rabbitRabbitMQ.但是在本指南中,RabbitMQ配置是默认配置(localhost服务器,凭证为guest/guest).如果我想用ip地址和凭证连接到远程RabbitMQ,我该怎么办?我不知道在我的应用程序中将这些信息设置在何处.
rabbitmq spring-rabbit spring-amqp spring-boot spring-rabbitmq
rabbitmq ×10
python ×3
celery ×2
socket.io ×2
django ×1
flask ×1
go ×1
heroku ×1
node.js ×1
pika ×1
rabbitmqctl ×1
redis ×1
replay ×1
scalability ×1
sigterm ×1
spring-amqp ×1
spring-boot ×1