Python 多处理 + 日志记录挂起

jak*_*anq 5 python multiprocessing python-multiprocessing python-logging

我正在尝试配置 Python 日志记录框架来处理来自multiprocessing.Pool. 在大多数情况下,脚本将无限期挂起,尽管已经观察到一些其他行为,例如退出而不打印所有日志消息。

我的实际代码更复杂,但我已将其简化为以下脚本,该脚本在我测试过的计算机上相当可靠地中断。

#!/usr/bin/env python3

import logging
import logging.handlers
import multiprocessing
import multiprocessing.util

L = logging.getLogger(__name__)

_globalQueue = None
_globalListener = None

def basicsetup():
    global _globalQueue
    global _globalListener

    cf = logging.Formatter("[{levelname}] {created:.7f} {name} ({process}~{processName}): {message}", style="{")

    handler = logging.StreamHandler()
    handler.setLevel(logging.DEBUG)
    handler.setFormatter(cf)

    # Subprocesses should use the queue to send log messages back to a thread in the main process
    _globalQueue = multiprocessing.Queue()

    _globalListener = logging.handlers.QueueListener(_globalQueue, handler, respect_handler_level=True)
    _globalListener.start()

    # Configure logging for main thread
    process_setup(get_queue())

def get_queue():
    return _globalQueue

def process_setup(queue):
    handler = logging.handlers.QueueHandler(queue)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)



def do_work(i):
    # Do something that involves logging
    # If nothing is logged, it works fine
    L.info("Hello {} from MP".format(i))

if __name__ == "__main__":
    # Also fails with other startup methods, but this is what I'm using in the actual application
    multiprocessing.set_start_method("spawn")
    # Optional, but more debugging info
    multiprocessing.util.log_to_stderr()
    # Configure logging
    basicsetup()

    # Set up multiprocessing pool, initialising logging in each subprocess
    with multiprocessing.Pool(initializer=process_setup, initargs=(get_queue(),)) as pl:
        # 100 seems to work fine, 500 fails most of the time.
        # If you're having trouble reproducing the error, try bumping this number up to 1000
        pl.map(do_work, range(500))

    if _globalListener is not None:
        # Stop the listener and join the thread it runs on.
        # If we don't do this, we may lose log messages when we exit.
        _globalListener.stop()
Run Code Online (Sandbox Code Playgroud)

该脚本的想法是使用多处理队列和标准日志记录 QueueListener 和 QueueHandler 类来处理子进程中的日志记录。

预期行为 - 脚本应记录“Hello X from MP”1000 次。

实际行为 - 在某些时候(在运行之间有所不同,有时根本不会发生)程序将无限期挂起。按 Ctrl+C 将产生一个回溯和一些日志消息,然后再次按 Ctrl+C 将终止脚本并带有另一个回溯。

运行失败时的示例输出(非常不确定,但通常看起来与此类似):

# First several hundred lines of logs removed - as far as I can tell, there's not much of interest missing.

[INFO] 1590652696.6525624 __mp_main__ (72404~SpawnPoolWorker-4): Hello 456 from MP
[INFO] 1590652696.6525996 __mp_main__ (72404~SpawnPoolWorker-4): Hello 457 from MP
[INFO] 1590652696.6526365 __mp_main__ (72404~SpawnPoolWorker-4): Hello 458 from MP
[INFO] 1590652696.6526761 __mp_main__ (72404~SpawnPoolWorker-4): Hello 459 from MP
[INFO] 1590652696.6527176 __mp_main__ (72404~SpawnPoolWorker-4): Hello 460 from MP
[INFO] 1590652696.6527598 __mp_main__ (72404~SpawnPoolWorker-4): Hello 461 from MP
^CTraceback (most recent call last):
  File "./test_logging.py", line 62, in <module>
    _globalListener.stop()
  File "/usr/lib/python3.8/logging/handlers.py", line 1508, in stop
    self._thread.join()
  File "/usr/lib/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] running the remaining "atexit" finalizers
[DEBUG/MainProcess] joining queue thread
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/util.py", line 300, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 224, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 195, in _finalize_join
    thread.join()
  File "/usr/lib/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Run Code Online (Sandbox Code Playgroud)

系统配置:我已经在 3 台不同机器上的 Arch Linux 上的 Python 3.8.3(截至今天)和 Fedora 30 上的 Python 3.7.7 上复制了这个。

对这个问题的任何见解都将不胜感激 - 我已经在这个问题上摸索了一段时间。

小智 1

我也遇到过这个问题。一种可能的解决方法是对日志消息使用多处理队列,所有工作人员将日志消息传递给单个进程并由单个进程记录。Python loguru模块取消了此设置,并且可以通过使用enqueue=True参数使日志处理程序安全地进行多处理。