在Python中使用多处理时应该如何记录?

cdl*_*ary 213 python logging multiprocessing

现在我在框架中有一个中央模块,它使用Python 2.6 multiprocessing模块生成多个进程.因为它使用multiprocessing,所以有模块级多处理感知日志,LOG = multiprocessing.get_logger().根据文档,这个记录器具有进程共享锁,因此您不会sys.stderr通过让多个进程同时写入来解决问题(或任何文件句柄).

我现在遇到的问题是框架中的其他模块不支持多处理.我看到它的方式,我需要使这个中央模块的所有依赖使用多处理感知日志记录.这框架很烦人,更不用说框架的所有客户了.我有没有想到的替代方案?

zzz*_*eek 118

我刚刚写了一个我自己的日志处理程序,它只是通过管道将所有内容提供给父进程.我只测试了十分钟,但似乎工作得很好.

(注意:这是硬编码的RotatingFileHandler,这是我自己的用例.)


更新:@javier现在将此方法维护为Pypi上的可用包 - 请参阅Pypi上的多处理日志记录,github,网址https://github.com/jruere/multiprocessing-logging


更新:实施!

现在,这使用队列来正确处理并发,并且还可以正确地从错误中恢复.我现在已经在生产中使用它几个月了,下面的当前版本没有问题.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
Run Code Online (Sandbox Code Playgroud)

  • @zzzeek,这个解决方案很好,但我找不到包含它或类似的东西,所以我创建了一个名为`multiprocessing-logging`的解决方案. (10认同)
  • 不幸的是,这种方法在Windows上不起作用.来自http://docs.python.org/library/multiprocessing.html 16.6.2.12"请注意,在Windows子进程上只会继承父进程记录器的级别 - 不会继承任何其他记录器自定义." 子进程不会继承处理程序,也不能显式传递它,因为它不是pickleable. (9认同)
  • 上面的处理程序从父进程写入所有文件,并只使用一个线程来接收从子进程传递的消息.如果您从生成的子进程调用处理程序本身,那么它正在使用它,并且您将获得与RotatingFileHandler相同的所有问题.我已经使用上面的代码多年没有问题了. (4认同)
  • 你能添加一个简单的例子来显示初始化,以及假设子进程的用法吗?我不太确定子进程如何在不实例化类的另一个实例的情况下访问队列. (4认同)
  • 值得注意的是,multiprocessing.Queue使用一个线程来放入put()。因此,在创建所有子流程之前,请勿调用put(即使用MultiProcessingLog`处理程序记录消息)。否则线程将在子进程中死亡。一种解决方案是在每个子进程开始时调用Queue._after_fork(),或者改用不涉及线程但正在阻塞的multiprocessing.queues.SimpleQueue。 (2认同)
  • @zzzeek感谢您的快速回复.经过多一点玩弄,看来这个目前只适用于Linux(可能是Mac,我还没试过).我已经创建了我在两个平台上运行的内容以及结果输出[这里](https://gist.github.com/JesseBuesking/10674086).您是否碰巧知道需要进行哪些修改才能在Linux和Windows上运行?这是我第一次尝试使用Python进行多处理,这就是为什么我会遇到这种情况的原因. (2认同)
  • 当多处理上下文设置为“spawn”时,这将不起作用,对吧?在“spawn”上下文中,新进程是全新的 Python 解释器,而不是现有进程的分支,因此它们不会从主进程继承任何内容。 (2认同)
  • @EricFrechette我刚刚检查过,如果上下文设置为“spawn”,则此解决方案不起作用 (2认同)

vla*_*adr 62

处理这种非侵入性的唯一方法是:

  1. 生成每个工作进程,使其日志转到不同的文件描述符(到磁盘或管道).理想情况下,所有日志条目都应加上时间戳.
  2. 然后,您的控制器进程可以执行以下操作之一:
    • 如果使用磁盘文件:在运行结束时合并日志文件,按时间戳排序
    • 如果使用管道(推荐):将所有管道中的日志条目合并到中央日志文件中.(例如,定期select从管道的文件描述符中,对可用的日志条目执行合并排序,并刷新到集中日志.重复.)

  • 为什么不在主进程中使用multiprocessing.Queue和日志记录线程呢?似乎更简单. (20认同)
  • @BrandonRhodes - 正如我所说,*非侵入性*。如果有很多代码需要重新连接以使用 `multiprocessing.Queue`,并且/或者如果 [性能是一个问题](http://stackoverflow.com/questions/8463008/ python-multiprocessing-pipe-vs-queue) (2认同)

fan*_*ous 22

python日志菜谱有两个完整的例子:https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes

它使用QueueHandler,这是python 3.2中的新功能,但很容易复制到您自己的代码中(就像我在python 2.7中自己做的那样)来自:https://gist.github.com/vsajip/591589

每个进程都将其登录QueueHandler,然后一个Queue线程或进程(为每个进程提供一个示例)选择它们并将它们全部写入文件 - 没有损坏或乱码的风险.

  • 至少从“QueueHandler”出现以来,这应该是公认的答案。它不是侵入性的、透明的,并且无论主进程使用什么记录器配置都可以工作。工作人员始终记录到其配置的“QueueHandler”,也不期望从父进程到生成子进程进行任何类型的记录器配置 (3认同)

Ali*_*har 20

另一个替代方案可能是包中的各种非基于文件的日志处理logging程序:

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(和别的)

这样,您可以轻松地在某个地方安装一个日志守护程序,您可以安全地写入并正确处理结果.(例如,一个简单的套接字服务器,只是取消消息并将其发送到自己的旋转文件处理程序.)

SyslogHandler也会照顾你.当然,您可以使用自己的实例syslog,而不是系统实例.


use*_*814 16

下面是另一个解决方案,重点关注来自谷歌的其他人(比如我)的简单性.记录应该很容易!仅限3.2或更高.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

  • `QueueHandler`和`QueueListener`类也可以在Python 2.7上使用,可以在[`logutils`](https://pythonhosted.org/logutils/queue.html)包中找到. (2认同)
  • 主进程的记录器还应该使用QueueHandler。在您当前的代码中,主进程绕过队列,因此主进程和工作进程之间可能存在竞争条件。每个人都应该登录到队列(通过QueueHandler),并且只应允许QueueListener登录到StreamHandler。 (2认同)

小智 13

保持日志记录和队列线程分离的其他变体.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
Run Code Online (Sandbox Code Playgroud)


Iop*_*eam 13

截至 2020 年,似乎有一种更简单的多处理日志记录方法。

此函数将创建记录器。您可以在此处设置格式以及您希望输出的位置(文件、标准输出):

def create_logger():
    import multiprocessing, logging
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(\
        '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
    handler = logging.FileHandler('logs/your_file_name.log')
    handler.setFormatter(formatter)

    # this bit will make sure you won't have 
    # duplicated messages in the output
    if not len(logger.handlers): 
        logger.addHandler(handler)
    return logger

Run Code Online (Sandbox Code Playgroud)

在 init 中,您实例化记录器:

if __name__ == '__main__': 
    from multiprocessing import Pool
    logger = create_logger()
    logger.info('Starting pooling')
    p = Pool()
    # rest of the code
Run Code Online (Sandbox Code Playgroud)

现在,您只需要在需要记录的每个函数中添加此引用:

logger = create_logger()
Run Code Online (Sandbox Code Playgroud)

并输出消息:

logger.info(f'My message from {something}')
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助。

  • 这似乎是现在最直接的解决方案。请注意,“if not len(logger.handlers)”部分假设您将使用单个处理程序。如果您希望有多个处理程序,例如,将所有消息发送到一个文件,但仅将 INFO 及以上消息发送到 stdout,那么您需要调整该部分。 (2认同)
  • 通常,您有大量的代码只是执行“导入日志记录”,然后使用“logging.info(“whatever”)”之类的东西 - 没有地方可以将记录器对象传递给任何东西,并且您没有机会改造该代码。 (2认同)
  • 这可行,但不太灵活。例如,一旦您将 create_logger() 放入所有函数中,就无法关闭日志记录,以防其他人想要在自己的应用程序中使用您的库。图书馆的最佳实践是永远不要强迫任何人查看日志消息。 (2认同)

sch*_*mar 10

通过使用处理程序,所有当前解决方案都与日志记录配置过于耦合.我的解决方案具有以下架构和功能:

  • 您可以使用所需的任何日志记录配置
  • 日志记录在守护程序线程中完成
  • 使用上下文管理器安全关闭守护程序
  • 与日志记录线程的通信是通过 multiprocessing.Queue
  • 在子进程中,logging.Logger(和已定义的实例)被修补以将所有记录发送到队列
  • 新增:在发送到队列之前格式化回溯和消息以防止出现酸洗错误

具有使用示例和输出的代码可以在以下要点中找到:https://gist.github.com/schlamar/7003737


Sam*_*uel 7

由于我们可以将多进程日志记录表示为多个发布者和一个订阅者(侦听器),因此使用ZeroMQ实现PUB-SUB消息传递确实是一种选择.

此外,PyZMQ模块(ZMQ的Python绑定)实现了PUBHandler,它是通过zmq.PUB套接字发布日志消息的对象.

Web上有一个解决方案,用于使用PyZMQ和PUBHandler从分布式应用程序进行集中式日志记录,可以轻松地在多个发布过程中进行本地工作.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
Run Code Online (Sandbox Code Playgroud)


Mik*_*ler 6

我也喜欢zzzeek的回答,但安德烈是正确的,需要一个队列来防止乱码.我对管道运气不错,但确实看到了一些有点预期的花边.实现它比我想象的更难,特别是由于在Windows上运行,对全局变量和东西有一些额外的限制(参见:如何在Windows上实现Python多处理?)

但是,我终于开始工作了.这个例子可能并不完美,欢迎提出意见和建议.它也不支持设置格式化程序或除根记录程序之外的任何其他内容.基本上,您必须使用队列重新启动每个池进程中的记录器,并在记录器上设置其他属性.

同样,欢迎任何关于如何使代码更好的建议.我当然不知道所有的Python技巧:-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
Run Code Online (Sandbox Code Playgroud)


小智 5

我想建议使用 logger_tt 库: https://github.com/Dragon2fly/logger_tt

multiporcessing_logging 库无法在我的 macOSX 上运行,而 logger_tt 可以。