ppd*_*oll 5 python redis celery
我在一个关于人脸识别的项目中使用 celery 作为 MQ。
我有三个任务队列“task_gpu0、task_gpu1、task_download”为gunicorn Flask服务器提供服务,该服务器使用redis作为代理和后端。
当我使用 jmeter 对服务器施加压力时,大约 20 分钟后,程序引发异常:
操作错误:写入套接字时出现错误 104。连接被对等方重置。
在检查redis日志时,我发现信息客户端中的client_recent_max_output_buffer和connected_clients继续增加。但是当我通过redis桌面管理器检查键值时,结果是可以的。
不知道为什么redis输出缓冲区和connected_clients不断增加。
雷迪斯日志:
:15:M 01 Apr 2019 09:11:18.113 # Client id=58081 addr=172.16.3.22:33832 fd=54 name= age=428 idle=0 flags=P db=1 sub=74995 psub=0 multi=-1 qbuf=79 qbuf-free=32689 obl=0 oll=452 omem=9267808 events=rw cmd=subscribe scheduled to be closed ASAP for overcoming of output buffer limits.
info clients:
connected_clients:587
client_recent_max_input_buffer:4
client_recent_max_output_buffer:55524832
blocked_clients:2
Run Code Online (Sandbox Code Playgroud)
任务下载.py
from celery import Celery
app = Celery()
app.config_from_object("celery_app_tmp.celeryconfig")
@app.task
def download(addImageInput, faceSetId):
download_someting()
return result_dict
Run Code Online (Sandbox Code Playgroud)
task0.py 与task1.py 相同
from celery import Celery
app = Celery()
app.config_from_object("celery_app_tmp.celeryconfig")
@app.task
def faceRec(addImageInput, faceSetId):
do someting()
return result_dict
Run Code Online (Sandbox Code Playgroud)
celeryconfig.py
from kombu import Queue
from kombu import Exchange
result_serializer = 'msgpack'
task_serializer = 'msgpack'
accept_content = ['json', 'msgpack']
broker_url = "redis://:redis@172.16.3.22:7369/1"
result_backend = "redis://:redis@172.16.3.22:7369/1"
worker_concurrency = 8
result_exchange_type = 'direct'
result_expires = 5
task_queues = (
Queue('gpu_0', exchange=Exchange('gpu_0'), routing_key='gpu_0'),
Queue('gpu_1', exchange=Exchange('gpu_1'), routing_key='gpu_1'),
)
task_routes = {
'celery_app_tmp.task0.faceRec': {'queue': 'gpu_0', 'routing_key': 'gpu_0'},
'celery_app_tmp.task1.faceRec': {'queue': 'gpu_1', 'routing_key': 'gpu_1'},
'celery_app_tmp.task_download.download': {'queue': 'download', 'routing_key': 'download'},
'celery_app_tmp.task_download.delFile': {'queue': 'download', 'routing_key': 'download'}
}
Run Code Online (Sandbox Code Playgroud)
主要.py
import celery_app_tmp.task0
import celery_app_tmp.task1
import celery_app_tmp.task_download
result_exc = chain(celery_app_tmp.task_download.download.s(addImageInput), random.choice(list_task).faceRec.s(faceSetId))()
while True:
if result_exc.ready():
dict_exc = result_exc.get()
break
Run Code Online (Sandbox Code Playgroud)