芹菜?redis中的client_recent_max_output_buffer和connected_clients持续增加

ppd*_*oll 5 python redis celery

我在一个关于人脸识别的项目中使用 celery 作为 MQ。

我有三个任务队列“task_gpu0、task_gpu1、task_download”为gunicorn Flask服务器提供服务,该服务器使用redis作为代理和后端。

当我使用 jmeter 对服务器施加压力时,大约 20 分钟后,程序引发异常:

操作错误:写入套接字时出现错误 104。连接被对等方重置。

在检查redis日志时,我发现信息客户端中的client_recent_max_output_buffer和connected_clients继续增加。但是当我通过redis桌面管理器检查键值时,结果是可以的。

不知道为什么redis输出缓冲区和connected_clients不断增加。

雷迪斯日志:

:15:M 01 Apr 2019 09:11:18.113 # Client id=58081 addr=172.16.3.22:33832 fd=54 name= age=428 idle=0 flags=P db=1 sub=74995 psub=0 multi=-1 qbuf=79 qbuf-free=32689 obl=0 oll=452 omem=9267808 events=rw cmd=subscribe scheduled to be closed ASAP for overcoming of output buffer limits.

info clients:
connected_clients:587
client_recent_max_input_buffer:4
client_recent_max_output_buffer:55524832
blocked_clients:2
Run Code Online (Sandbox Code Playgroud)

任务下载.py

from celery import Celery
app = Celery()
app.config_from_object("celery_app_tmp.celeryconfig")

@app.task
def download(addImageInput, faceSetId):
    download_someting()
    return result_dict
Run Code Online (Sandbox Code Playgroud)

task0.py 与task1.py 相同

from celery import Celery
app = Celery()
app.config_from_object("celery_app_tmp.celeryconfig")

@app.task
def faceRec(addImageInput, faceSetId):
    do someting()
    return result_dict
Run Code Online (Sandbox Code Playgroud)

celeryconfig.py

from kombu import Queue
from kombu import Exchange

result_serializer = 'msgpack'
task_serializer = 'msgpack'
accept_content = ['json', 'msgpack']

broker_url = "redis://:redis@172.16.3.22:7369/1"
result_backend = "redis://:redis@172.16.3.22:7369/1"

worker_concurrency = 8

result_exchange_type = 'direct'
result_expires = 5

task_queues = (
    Queue('gpu_0', exchange=Exchange('gpu_0'), routing_key='gpu_0'),
    Queue('gpu_1', exchange=Exchange('gpu_1'), routing_key='gpu_1'),

)

task_routes = {

                'celery_app_tmp.task0.faceRec': {'queue': 'gpu_0', 'routing_key': 'gpu_0'},
                'celery_app_tmp.task1.faceRec': {'queue': 'gpu_1', 'routing_key': 'gpu_1'},
                'celery_app_tmp.task_download.download': {'queue': 'download', 'routing_key': 'download'},
                'celery_app_tmp.task_download.delFile': {'queue': 'download', 'routing_key': 'download'}
}
Run Code Online (Sandbox Code Playgroud)

主要.py

import celery_app_tmp.task0
import celery_app_tmp.task1
import celery_app_tmp.task_download


result_exc = chain(celery_app_tmp.task_download.download.s(addImageInput), random.choice(list_task).faceRec.s(faceSetId))()
while True:
    if result_exc.ready():
        dict_exc = result_exc.get()
        break
Run Code Online (Sandbox Code Playgroud)