如何用 TimedRotatingFileHandler 替换 celery 默认记录器

LJH*_*JHW 2 python logging celery

我一整天都在与这个作斗争。我读过几十篇 stackoverflow 和其他帖子。许多建议都不起作用,而且没有一个直接解决我想做的事情。因此,为了其他挣扎的人的利益,我将发布此内容并回答它。

想象一下你的 celery 工作任务和你的节拍调度器都工作得很好。使用默认的构造方法,您将陷入由以下定义的 2 个日志文件,例如:

celery worker ..... -f ./logs/celeryworker.log
celery beat ..... -f ./logs/celerybeat.log
Run Code Online (Sandbox Code Playgroud)

这些只会无限增长。如何引入旋转日志?

LJH*_*JHW 6

默认的芹菜logging.FileHandlerWatchedFileHandler. 我们想使用TimedRotatingFileHandlerfrom logging.handlersafter_setup_logger因此,我们在任务文件中使用 celery 信号来完成以下任务:

\n\n
from celery.signals import after_setup_logger\n\n@after_setup_logger.connect\ndef replace_handler(**kwargs_):\n    logger = kwargs_[\'logger\'] if \'logger\' in kwargs_ else None\n    if logger and logger.handlers:\n        handler_celery = logger.handlers[-1]\n        handler = TimedRotatingFileHandler(handler_celery.baseFilename, \'midnight\', 1)\n        handler.setFormatter(handler_celery.formatter)\n        logger.handlers[-1] = handler\n
Run Code Online (Sandbox Code Playgroud)\n\n

现在这会重置工作器和节拍记录器。我已经在TimedRotatingFileHandler构造函数中使用 2 分钟间隔对此进行了测试,因此我假设它适用于夜间轮换。

\n\n

编辑1:2019年9月25日

\n\n

感谢 Tom\xc3\xa1\xc5\xa1 Linhart - 我将研究一下logrotate。\n我今晚正在编码,所以我检查了日志确实在一夜之间轮换。\n我还稍微更新了代码以使其更安全。\n我有一个非常简单的设置:并发=1,其中我需要按顺序处理任务的 FIFO 队列,所以也许这个简单的解决方案掩盖了 Tom\xc3\xa1\xc5\xa1 指出的潜在问题。

\n\n

编辑2:2019年9月25日

\n\n

我花了时间研究这个。仍然存在几个问题:WatchedFileHandler仅是 Linux 并且仍然会受到竞争条件的影响,而logrotate其本身也可能会受到竞争条件的影响。https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes解释了多进程日志记录和https://docs.python.org /3/howto/logging-cookbook.html解释了如何使用QueueListener而不是创建自己的multiprocessing.Process. 将所有这些放在一起,我为以下内容创建了一个多进程类包装器TimedRotatingFileHandler

\n\n
import logging\nfrom logging.handlers import QueueHandler, QueueListener, TimedRotatingFileHandler\nfrom multiprocessing import Queue as MQueue\nimport multiprocessing.queues\nfrom celery.signals import after_setup_logger\nfrom atexit import register\n\nclass QueuedTimedRotatingFileHandler:\n    instance = None\n\n    def __init__(self, filename_, formatter_):\n        self.queue = MQueue(-1)\n        handler = TimedRotatingFileHandler(filename_, \'midnight\', 1)\n        handler.setFormatter(formatter_)\n        self.listener = QueueListener(self.queue, handler)\n        self.listener.start()\n        register(self.stop)\n\n    def stop(self):\n        self.listener.stop()\n\n@after_setup_logger.connect\ndef replace_handler(**kwargs_):\n    logger = kwargs_[\'logger\'] if \'logger\' in kwargs_ else None\n    if logger and logger.handlers:\n        handler_celery = logger.handlers[-1]\n        if QueuedTimedRotatingFileHandler.instance is None:\n            QueuedTimedRotatingFileHandler.instance = \\\n                QueuedTimedRotatingFileHandler(\n                    handler_celery.baseFilename,\n                    handler_celery.formatter\n                )\n        handler = QueueHandler(QueuedTimedRotatingFileHandler.instance.queue)\n        logger.handlers[-1] = handler\n
Run Code Online (Sandbox Code Playgroud)\n\n

现在感觉这是最安全、最优雅的跨平台解决方案。欢迎任何评论。

\n\n

编辑3:2019年9月26日

\n\n

我认为最后一次更改已经更改了上面的代码。需要像单例一样构造包装器,因为 celery Worker 可以多次解析文件。

\n\n

编辑4:2019年2月10日

\n\n

最后一个改变。停止监听器需要注册atexit才能正确触发。

\n