如何创建Celery Windows服务?

Vit*_*con 10 python windows windows-services celery

我正在尝试创建一个Windows服务来启动Celery.我遇到过一篇使用任务计划程序执行此操作的文章.然而,它似乎推出了许多芹菜实例并且在机器死亡之前不断消耗内存.有没有办法将其作为Windows服务启动?

Vit*_*con 12

我从另一个网站得到了答案.Celeryd(芹菜守护服务)中运行的应用程序贴膜,搜索"帕斯特Windows服务"导致我在这里.它描述了如何将Pylons应用程序作为Windows服务运行.作为paster框架和托管python Web服务的新手,我最初并没有想到要检查它.但是这个解决方案适用于Celery,在脚本中有一点点变化.

我修改了脚本,以便更容易修改Celery设置.基本的变化是:

  1. 使用Celery服务的设置创建一个INI文件(如下所示)
  2. 创建一个python脚本来创建Windows服务.

INI文件设置(celeryd.ini):

[celery:service]
service_name = CeleryService
service_display_name = Celery Service
service_description = WSCGI Windows Celery Service
service_logfile = celeryd.log
Run Code Online (Sandbox Code Playgroud)

用于创建Windows服务的Python脚本(CeleryService.py):

"""
The most basic (working) Windows service possible.
Requires Mark Hammond's pywin32 package.  
Most of the code was taken from a  CherryPy 2.2 example of how to set up a service
"""
import pkg_resources
import win32serviceutil
from paste.script.serve import ServeCommand as Server
import os, sys
import ConfigParser

import win32service
import win32event

SCRIPT_DIR          = os.path.abspath(os.path.dirname(__file__))
INI_FILE            = 'celeryd.ini'
SERV_SECTION        = 'celery:service'
SERV_NAME           = 'service_name'
SERV_DISPLAY_NAME   = 'service_display_name'
SERV_DESC           = 'service_description'
SERV_LOG_FILE       = 'service_logfile'
SERV_APPLICATION    = 'celeryd'
SERV_LOG_FILE_VAR   = 'CELERYD_LOG_FILE'

# Default Values
SERV_NAME_DEFAULT           = 'CeleryService'
SERV_DISPLAY_NAME_DEFAULT   = 'Celery Service'
SERV_DESC_DEFAULT           = 'WSCGI Windows Celery Service'
SERV_LOG_FILE_DEFAULT       = r'D:\logs\celery.log'

class DefaultSettings(object):
    def __init__(self):
        if SCRIPT_DIR:
            os.chdir(SCRIPT_DIR)
        # find the ini file
        self.ini = os.path.join(SCRIPT_DIR,INI_FILE)
        # create a config parser opject and populate it with the ini file
        c = ConfigParser.SafeConfigParser()
        c.read(self.ini)
        self.c = c

    def getDefaults(self):
        '''
        Check for and get the default settings
        '''
        if (
            (not self.c.has_section(SERV_SECTION)) or
            (not self.c.has_option(SERV_SECTION, SERV_NAME)) or
            (not self.c.has_option(SERV_SECTION, SERV_DISPLAY_NAME)) or
            (not self.c.has_option(SERV_SECTION, SERV_DESC)) or
            (not self.c.has_option(SERV_SECTION, SERV_LOG_FILE))
            ):
            print 'setting defaults'
            self.setDefaults()
        service_name = self.c.get(SERV_SECTION, SERV_NAME)
        service_display_name = self.c.get(SERV_SECTION, SERV_DISPLAY_NAME)
        service_description = self.c.get(SERV_SECTION, SERV_DESC)
        iniFile = self.ini
        service_logfile = self.c.get(SERV_SECTION, SERV_LOG_FILE)
        return service_name, service_display_name, service_description, iniFile, service_logfile

    def setDefaults(self):
        '''
        set and add the default setting to the ini file
        '''
        if not self.c.has_section(SERV_SECTION):
            self.c.add_section(SERV_SECTION)
        self.c.set(SERV_SECTION, SERV_NAME, SERV_NAME_DEFAULT)
        self.c.set(SERV_SECTION, SERV_DISPLAY_NAME, SERV_DISPLAY_NAME_DEFAULT)
        self.c.set(SERV_SECTION, SERV_DESC, SERV_DESC_DEFAULT)
        self.c.set(SERV_SECTION, SERV_LOG_FILE, SERV_LOG_FILE_DEFAULT)
        cfg = file(self.ini, 'wr')
        self.c.write(cfg)
        cfg.close()
        print '''
you must set the celery:service section service_name, service_display_name,
and service_description options to define the service 
in the %s file
''' % self.ini
        sys.exit()


class CeleryService(win32serviceutil.ServiceFramework):
    """NT Service."""

    d = DefaultSettings()
    service_name, service_display_name, service_description, iniFile, logFile = d.getDefaults()

    _svc_name_ = service_name
    _svc_display_name_ = service_display_name
    _svc_description_ = service_description

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        # create an event that SvcDoRun can wait on and SvcStop
        # can set.
        self.stop_event = win32event.CreateEvent(None, 0, 0, None)

    def SvcDoRun(self):
        os.chdir(SCRIPT_DIR)
        s = Server(SERV_APPLICATION)
        os.environ[SERV_LOG_FILE_VAR] = self.logFile
        s.run([self.iniFile])
        win32event.WaitForSingleObject(self.stop_event, win32event.INFINITE)

    def SvcStop(self):
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        #win32event.SetEvent(self.stop_event)
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        sys.exit()

if __name__ == '__main__':
    win32serviceutil.HandleCommandLine(CeleryService)
Run Code Online (Sandbox Code Playgroud)

安装服务运行python CeleryService.py install然后python CeleryService.py start启动服务.注意:这些命令应在具有管理员权限的命令行中运行.

如果需要删除该服务,请运行python CeleryService.py remove.

我试图托管Celery作为增强我的RhodeCode安装的一部分.这个解决方案似乎有效.希望这会对某人有所帮助.


aza*_*lea 5

接受的答案不适用于使用Django应用程序运行celery。但这启发了我想出一个解决方案,用于将celery作为Django的Windows服务运行。请注意,以下内容仅适用于Django项目。经过一些修改,它可以与其他应用程序一起使用。

以下讨论假定Python> = 3.6和RabbitMQ已安装并rabbitmq-server在上运行localhost

在Django项目的顶级文件夹中创建一个文件celery_service.py(或您喜欢的任何文件),与manage.py处于同一级别,其内容如下:

'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
from pathlib import Path
import shlex
import logging
import time

# The directory for celery.log and celery_service.log
# Default: the directory of this script
INSTDIR = Path(__file__).parent
# The path of python Scripts
# Usually it is in path_to/venv/Scripts.
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = INSTDIR / 'venvcelery/Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'proj'

logging.basicConfig(
    filename = INSTDIR / 'celery_service.log',
    level = logging.DEBUG, 
    format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)

class CeleryService(win32serviceutil.ServiceFramework):

    _svc_name_ = "Celery"
    _svc_display_name_ = "Celery Distributed Task Queue Service"

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)           

    def SvcStop(self):
        logging.info('Stopping {name} service ...'.format(name=self._svc_name_))        
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        sys.exit()           

    def SvcDoRun(self):
        logging.info('Starting {name} service ...'.format(name=self._svc_name_))
        os.chdir(INSTDIR) # so that proj worker can be found
        logging.info('cwd: ' + os.getcwd())
        self.ReportServiceStatus(win32service.SERVICE_RUNNING)
        command = '"{celery_path}" -A {proj_dir} worker -f "{log_path}" -l info -P eventlet'.format(
            celery_path=PYTHONSCRIPTPATH / 'celery.exe',
            proj_dir=PROJECTDIR,
            log_path=INSTDIR / 'celery.log')
        logging.info('command: ' + command)
        args = shlex.split(command)
        proc = subprocess.Popen(args)
        logging.info('pid: {pid}'.format(pid=proc.pid))
        self.timeout = 3000
        while True:
            rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
            if rc == win32event.WAIT_OBJECT_0:
                # stop signal encountered
                # terminate process 'proc'
                PROCESS_TERMINATE = 1
                handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
                win32api.TerminateProcess(handle, -1)
                win32api.CloseHandle(handle)                
                break

if __name__ == '__main__':
   win32serviceutil.HandleCommandLine(CeleryService)
Run Code Online (Sandbox Code Playgroud)

在运行脚本之前,您需要

  1. (可选)创建python虚拟环境,例如“ venvcelery”。

  2. 安装以下要求:

    django> = 2.0.0 sqlalchemy> = 1.0.14 celery> = 4.3.0,<5.0 pywin32> = 227 eventlet> = 0.25

  3. 修复pywin32的pywintypes36.dll位置。参考

  4. 在celery_service.py中正确设置PYTHONSCRIPTPATH和PROJECTDIR

PYTHONSCRIPTPATH通常是python安装路径或当前虚拟环境下的“脚本”文件夹

PROJECTDIR是Django项目的目录名称。

它是与manage.py处于同一级别的目录,而不是父目录。

现在,您可以使用以下方法安装/启动/停止/删除服务:

python celery_service.py install
python celery_service.py start
python celery_service.py stop
python celery_service.py remove
Run Code Online (Sandbox Code Playgroud)

我创建了一个演示Django项目,其中celery作为Windows服务运行:

https://github.com/azalea/django_celery_windows_service

如果您对正在运行的示例感兴趣。


注意:这是更新的版本,假设Python> = 3.6,Django 2.2和Celery 4。

可以在编辑历史记录中查看具有Python 2.7,Django 1.6和Celery 3的旧版本。