芹菜工人睡眠无法正常工作

Lui*_*deu 6 python sleep worker celery

我有下一个问题,我在 Python 上使用的进程必须等待 X 秒,该进程本身可以正常工作,问题是当我将其作为 celery 的任务时。

当工作人员尝试对一项任务执行 time.sleep(X) 时,它会暂停工作人员中的所有任务,例如:

我有工人 A,它可以同时执行 4 个任务(q、w、e 和 r),任务 r 的睡眠时间为 1800 秒,因此工人同时执行 4 个任务,但是当r 任务做睡眠,工人也停止 q、w 和 e。

这是正常的吗?你知道我该如何解决这个问题吗?

编辑:这是带有我的节拍和队列的 celery.py 示例

app.conf.update(
CELERY_DEFAULT_QUEUE='default',
CELERY_QUEUES=(
    Queue('search', routing_key='search.#'),
    Queue('tests', routing_key='tests.#'),
    Queue('default',    routing_key='tasks.#'),
),

CELERY_DEFAULT_EXCHANGE='tasks',
CELERY_DEFAULT_EXCHANGE_TYPE='topic',
CELERY_DEFAULT_ROUTING_KEY='tasks.default',
CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_TASK_SOFT_TIME_LIMIT=1800,
CELERY_ROUTES={
    'tests.tasks.volume': {
        'queue': 'tests',
        'routing_key': 'tests.volume',
    },
    'tests.tasks.summary': {
        'queue': 'tests',
        'routing_key': 'tests.summary',
    },
    'search.tasks.links': {
        'queue': 'search',
        'routing_key': 'search.links',
    },
    'search.tasks.urls': {
        'queue': 'search',
        'routing_key': 'search.urls',
    },
},

CELERYBEAT_SCHEDULE={
    # heavy one
    'each-hour-summary': {
        'task': 'tests.tasks.summary',
        'schedule': crontab(minute='0', hour='*/1'),
        'args': (),
    },
    'each-hour-volume': {
        'task': 'tests.tasks.volume',
        'schedule': crontab(minute='0', hour='*/1'),
        'args': (),
    },
    'links-each-cuarter': {
        'task': 'search.tasks.links',
        'schedule': crontab(minute='*/15'),
        'args': (),
    },
    'urls-each-ten': {
        'schedule': crontab(minute='*/10'),
        'task': 'search.tasks.urls',
        'args': (),
    },
}
)
Run Code Online (Sandbox Code Playgroud)

测试任务.py

@app.task
def summary():
    execute_sumary() #heavy task ~ 1 hour aprox

@app.task
def volume():
    execute_volume() #no important ~ less than 5 minutes
Run Code Online (Sandbox Code Playgroud)

和 search.tasks.py

@app.task
def links():
    free = search_links() #return boolean
    if free:
        process_links()
    else:
        time.sleep(1080) #<--------sleep with which I have problems
    process_links()

@app.task
def urls():
    execute_urls() #no important ~ less than 1 minute
Run Code Online (Sandbox Code Playgroud)

好吧,我有 2 个工人,A 用于队列搜索,B 用于测试和默认值。

问题在于 A,当它接受任务“链接”并执行 time.sleep() 时,它会停止工作人员正在执行的其他任务。

因为工人 B 工作正常,我认为问题出在 time.sleep() 函数上。

Dej*_*kic 5

如果你只有一个进程/线程,调用 sleep() 会阻塞它。这意味着没有其他任务将运行...