如何动态正确关闭Python RQ工作进程?

Jua*_*oto 9 python asynchronous signals process

使用Python RQ,我们正在尝试动态管理工作进程.我们使用定制的工作脚本,(简化形式)如下:

from rq import Connection, Worker

queues_to_listen_on = get_queues_to_listen_on()

with Connection(connection = get_worker_connection()):
    w = Worker(queues_to_listen_on)
    w.work()
Run Code Online (Sandbox Code Playgroud)

我们对关闭工人特别感兴趣.我们主要关注的是如何正常关闭工作人员,以便在关闭之前完成当前工作.在request_stop(...)适当的信号处理程序Worker的对象似乎做什么,我们需要,但似乎没有发射它的方式(至少据我所知),除非它是通过按压CTRL+C在终端上运行的工作进程.

在我看来,有两个可能的解决方案(肯定会有更多) - 按优先顺序排列:

  1. 以编程方式,使用rq库,发送信号request_stop,从而触发正常关机.
  2. 以某种方式获取正确进程的pid(不确定主力进程或工作者监听器进程)并使用其他方法,将适当的信号发送到该进程.我们有一些方法可以做到这一点,但它可能需要更多的工作,并引入其他变量,我宁愿被遗漏的问题(例如,使用Fabric远程命令或沿着这些行的东西).

如果有更好的方法来解决这个问题或者实现相同目标的不同替代方案,我将非常感谢您的建议.

woo*_*ing 5

就设计而言,选项 1 肯定更好。

然而,为了解决必须使用CTRL + C退出进程的特定问题(我也讨厌这个),您可以为您的工作人员使用以下策略:

# WORKER_NAME.py
import os

PID = os.getpid()

@atexit.register
def clean_shut():
    print "Clean shut performed"

    try:
        os.unlink("WORKER_NAME.%d" % PID)
    except:
        pass

# Worker main
def main():
    f = open("WORKER_NAME.%d" % PID, "w")
    f.write("Delete this to end WORKER_NAME gracefully")
    f.close()

    while os.path.exists("WORKER_NAME.%d" % PID):
        # Worker working
Run Code Online (Sandbox Code Playgroud)

在你的主脚本中,按照 @Borys 建议获取工作进程 PID,发送热停止请求,并os.unlink("path/to/WORKER_NAME.%d" % worker_PID)确保正常关闭:)

但这仅适用于运行无限循环的工作人员。如果工作进程调用的事物甚至会阻塞普通的顺序一次性作业,则您必须进一步跟踪可能的阻塞例程以从那里解决问题,例如应用某种超时策略。