如何在 python-rq 中在计划作业和排队作业之间创建“depends_on”关系

ost*_*ler 5 python task-queue redis python-rq

我有一个 Web 服务(Python 3.7、Flask 1.0.2),其工作流由 3 个步骤组成:

  • 步骤 1:将远程计算作业提交到商业排队系统(IBM 的 LSF)
  • 第 2 步:每 61 秒轮询一次远程计算作业状态(61 秒因为缓存作业状态结果)
  • 第 3 步:如果第 2 步返回远程计算作业状态 ==“DONE”,则数据后处理

远程计算作业的长度是任意的(在秒和天之间),每一步都依赖于上一步的完成:

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue()
    job1 = q.enqueue(step1)
    job2 = q.enqueue(step2, depends_on=job1)
    job3 = q.enqueue(step3, depends_on=job2)
Run Code Online (Sandbox Code Playgroud)

但是,最终所有工作人员(4 名工作人员)都将进行轮询(4 个客户端请求中的第 2 步),而他们应该继续执行其他传入请求的第 1 步和已成功通过第 2 步的那些工作流的第 3 步。

工人应该在每次投票后被释放。他们应该定期返回第 2 步进行下一次轮询(每个作业最多每 61 秒一次),如果远程计算作业轮询未返回“DONE”,则重新排队轮询作业。


此时我开始使用rq-scheduler(因为间隔和重新排队功能听起来很有希望):

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue()
    s = Scheduler('default')

    job1 = q.enqueue(step1, REQ_ID)

    job2 = Job.create(step2, (REQ_ID,), depends_on=job1)
    job2.meta['interval'] = 61
    job2.origin = 'default'
    job2.save()
    s.enqueue_job(job2)

    job3 = q.enqueue(step3, REQ_ID, depends_on=job2)
Run Code Online (Sandbox Code Playgroud)

Job2 被正确创建(包括与depends_onjob1的关系,但 s.enqueue_job() 立即执行它,忽略它与 job1 的关系。(q.enqueue_job() 的函数文档字符串实际上说它立即执行......) .

depends_on当 job2 放入调度程序而不是队列时,如何创建job1、job2 和 job3 之间的关系?(或者,如何将 job2 交给调度程序,而不立即执行 job2 并等待 job1 完成?)


出于测试目的,步骤如下所示:

def step1():
    print(f'*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...', flush=True)
    time.sleep(20)
    print(f'    <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED', flush=True)
    return True

def step2():
    print(f'    --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...', flush=True)
    time.sleep(10)
    print(f'    <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED', flush=True)
    return True

def step3():
    print(f'    --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...', flush=True)
    time.sleep(10)
    print(f'*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED', flush=True)
    return True
Run Code Online (Sandbox Code Playgroud)

我收到的输出是这样的:

worker_1     | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9)
worker_2     | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58)
worker_2     |     --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED...
worker_1     | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED...
...
Run Code Online (Sandbox Code Playgroud)

job2 不等待 job1 完成...


#requirements.txt
Flask==1.0.2
Flask-Bootstrap==3.3.7.1
Flask-Testing==0.7.1
Flask-WTF==0.14.2
redis==3.3.11
rq==0.13
rq_scheduler==0.9.1
Run Code Online (Sandbox Code Playgroud)

ost*_*ler 2

我对这个问题的解决方案rq仅使用(并且不再rq_scheduler):

\n\n
    \n
  1. 升级到最新的 python-rq 包:

    \n\n
    # requirements.txt\n...\nrq==1.1.0\n
    Run Code Online (Sandbox Code Playgroud)
  2. \n
  3. 为轮询作业创建专用队列,并相应地对作业进行排队(具有depends_on关系):

    \n\n
    with Connection(redis.from_url(current_app.config[\'REDIS_URL\'])):\n    q = Queue(\'default\')\n    p = Queue(\'pqueue\')\n    job1 = q.enqueue(step1)\n    job2 = p.enqueue(step2, depends_on=job1)  # step2 enqueued in polling queue\n    job3 = q.enqueue(step3, depends_on=job2)\n
    Run Code Online (Sandbox Code Playgroud)
  4. \n
  5. 为轮询队列派生一个专用工作线程。它继承自标准Worker类:

    \n\n
    class PWorker(rq.worker.Worker):\n    def execute_job(self, *args, **kwargs):\n        seconds_between_polls = 65\n        job = args[0]\n        if \'lastpoll\' in job.meta:\n            job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds()\n            if job_timedelta < seconds_between_polls:\n                sleep_period = seconds_between_polls - job_timedelta\n                time.sleep(sleep_period)\n        job.meta[\'lastpoll\'] = datetime.utcnow()\n        job.save_meta()\n\n        super().execute_job(*args, **kwargs)\n
    Run Code Online (Sandbox Code Playgroud)\n\n

    PWorkerexecute_job通过向作业的元数据添加时间戳来扩展该方法\'lastpoll\'

    \n\n

    如果轮询作业进来,并带有lastpoll时间戳,工作人员会检查此后的时间段是否lastpoll大于 65 秒。如果是,则将当前时间写入 \'lastpoll\'并执行轮询。如果没有,它会休眠直到 65 秒结束,然后写入当前时间\'lastpoll\'并执行轮询。没有时间戳的作业lastpoll是第一次轮询,工作人员创建时间戳并执行轮询。

  6. \n
  7. 创建一个专用异常(由任务函数抛出)和一个异常处理程序来处理它:

    \n\n
    # exceptions.py\n\nclass PACError(Exception):\n    pass\n\nclass PACJobRun(PACError):\n    pass\n\nclass PACJobExit(PACError):\n    pass\n
    Run Code Online (Sandbox Code Playgroud)\n\n
    # exception_handlers.py\n\ndef poll_exc_handler(job, exc_type, exc_value, traceback):\n    if exc_type is PACJobRun:\n        requeue_job(job.get_id(), connection=job.connection)\n        return False  # no further exception handling\n    else:\n        return True  # further exception handling\n
    Run Code Online (Sandbox Code Playgroud)\n\n
    # tasks.py\n\ndef step2():\n    # GET request to remote compute job portal API for status\n    # if response == "RUN":\n    raise PACJobRun\n    return True\n
    Run Code Online (Sandbox Code Playgroud)\n\n

    当自定义异常处理程序捕获自定义异常时(这意味着远程计算作业仍在运行),它会在轮询队列中重新排队该作业。

  8. \n
  9. 将自定义异常处理程序放入异常处理层次结构中:

    \n\n
    # manage.py\n\n@cli.command(\'run_pworker\')\ndef run_pworker():\n    redis_url = app.config[\'REDIS_URL\']\n    redis_connection = redis.from_url(redis_url)\n    with rq.connections.Connection(redis_connection):\n        pworker = PWorker(app.config[\'PQUEUE\'], exception_handlers=[poll_exc_handler])\n        pworker.work()\n
    Run Code Online (Sandbox Code Playgroud)
  10. \n
\n\n

这个解决方案的好处是它只用几行额外代码就扩展了 python-rq 的标准功能。另一方面,额外的队列和工作线程 \xe2\x80\xa6 增加了复杂性

\n