PySpark从执行程序登录

Cho*_*eat 15 python log4j apache-spark pyspark

在执行程序上使用pyspark访问Spark的log4j记录器的正确方法是什么?

在驱动程序中这样做很容易,但我似乎无法理解如何访问执行程序上的日志记录功能,以便我可以在本地登录并让YARN收集本地日志.

有没有办法访问本地记录器?

标准的日志记录过程是不够的,因为我无法从执行程序访问spark上下文.

Mar*_*usz 24

您不能在执行程序上使用本地log4j记录器.执行者生成的Python工作者jvms没有与java的"回调"连接,他们只接收命令.但是有一种方法可以使用标准的python日志记录从执行程序登录并通过YARN捕获它们.

在你HDFS上放置python模块文件,它为每个python worker配置一次日志记录和代理日志功能(命名它logger.py):

import os
import logging
import sys

class YarnLogger:
    @staticmethod
    def setup_logger():
        if not 'LOG_DIRS' in os.environ:
            sys.stderr.write('Missing LOG_DIRS environment variable, pyspark logging disabled')
            return 

        file = os.environ['LOG_DIRS'].split(',')[0] + '/pyspark.log'
        logging.basicConfig(filename=file, level=logging.INFO, 
                format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')

    def __getattr__(self, key):
        return getattr(logging, key)

YarnLogger.setup_logger()
Run Code Online (Sandbox Code Playgroud)

然后在您的应用程序中导入此模块:

spark.sparkContext.addPyFile('hdfs:///path/to/logger.py')
import logger
logger = logger.YarnLogger()
Run Code Online (Sandbox Code Playgroud)

您可以在普通日志库中使用pyspark函数内部:

def map_sth(s):
    logger.info("Mapping " + str(s))
    return s

spark.range(10).rdd.map(map_sth).count()
Run Code Online (Sandbox Code Playgroud)

pyspark.log会在资源管理器中可见,将在应用程序完成收集,所以你以后可以访问这些日志yarn logs -applicationId ..... 在此输入图像描述

  • 如果有人不确定您是否可以通过执行“yarn messages -applicationId <app_id> -log_files pyspark.log”单独访问“pyspark.log” (2认同)

Oli*_* W. 8

请注意,Mariusz的答案会返回日志记录模块的代理.当您的日志记录需求非常基本时,这可以工作(upvoted).一旦您对配置多个记录器实例或使用多个处理程序这样的事情感兴趣,它就会缺乏.例如,如果您有一组更大的代码,您只想在调试时运行,其中一个解决方案是检查记录器实例的isEnabledFor方法,如下所示:

logger = logging.getLogger(__name__)
if logger.isEnabledFor(logging.DEBUG):
    # do some heavy calculations and call `logger.debug` (or any other logging method, really)
Run Code Online (Sandbox Code Playgroud)

当在日志记录模块上调用该方法时,这会失败,就像在Mariusz的答案中一样,因为日志记录模块没有这样的属性.

解决此问题的一种方法是创建一个spark_logging.py模块,在其中配置日志记录并返回新的实例Logger.下面的代码显示了一个示例,它使用配置日志记录dictConfig.它还添加了一个过滤器,以便在使用根记录器时大大减少来自所有工作节点的重复次数(过滤器示例来自Christopher Dunn(参考)).

# spark_logging.py
import logging
import logging.config
import os
import tempfile
from logging import *  # gives access to logging.DEBUG etc by aliasing this module for the standard logging module


class Unique(logging.Filter):
    """Messages are allowed through just once.
    The 'message' includes substitutions, but is not formatted by the
    handler. If it were, then practically all messages would be unique!
    """
    def __init__(self, name=""):
        logging.Filter.__init__(self, name)
        self.reset()

    def reset(self):
        """Act as if nothing has happened."""
        self.__logged = {}

    def filter(self, rec):
        """logging.Filter.filter performs an extra filter on the name."""
        return logging.Filter.filter(self, rec) and self.__is_first_time(rec)

    def __is_first_time(self, rec):
        """Emit a message only once."""
        msg = rec.msg %(rec.args)
        if msg in self.__logged:
            self.__logged[msg] += 1
            return False
        else:
            self.__logged[msg] = 1
            return True


def getLogger(name, logfile="pyspark.log"):
    """Replaces getLogger from logging to ensure each worker configures
    logging locally."""

    try:
        logfile = os.path.join(os.environ['LOG_DIRS'].split(',')[0], logfile)
    except (KeyError, IndexError):
        tmpdir = tempfile.gettempdir()
        logfile = os.path.join(tmpdir, logfile)
        rootlogger = logging.getLogger("")
        rootlogger.addFilter(Unique())
        rootlogger.warning(
            "LOG_DIRS not in environment variables or is empty. Will log to {}."
            .format(logfile))

    # Alternatively, load log settings from YAML or use JSON.
    log_settings = {
        'version': 1,
        'disable_existing_loggers': False,
        'handlers': {
            'file': {
                'class': 'logging.FileHandler',
                'level': 'DEBUG',
                'formatter': 'detailed',
                'filename': logfile
            },
            'default': {
                'level': 'INFO',
                'class': 'logging.StreamHandler',
            },
        },
        'formatters': {
            'detailed': {
                'format': ("%(asctime)s.%(msecs)03d %(levelname)s %(module)s - "
                           "%(funcName)s: %(message)s"),
            },
        },
        'loggers': {
            'driver': {
                'level': 'INFO',
                'handlers': ['file', ]
            },
            'executor': {
                'level': 'DEBUG',
                'handlers': ['file', ]
            },
        }
    }

    logging.config.dictConfig(log_settings)
    return logging.getLogger(name)
Run Code Online (Sandbox Code Playgroud)

然后,您可以导入此模块并为其logging自身别名:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Test logging") \
    .getOrCreate()

try:
    spark.sparkContext.addPyFile('s3://YOUR_BUCKET/spark_logging.py')
except:
    # Probably running this locally. Make sure to have spark_logging in the PYTHONPATH
    pass
finally:
    import spark_logging as logging

def map_sth(s):
    log3 = logging.getLogger("executor")
    log3.info("Logging from executor")

    if log3.isEnabledFor(logging.DEBUG):
        log3.debug("This statement is only logged when DEBUG is configured.")

    return s

def main():
    log2 = logging.getLogger("driver")
    log2.info("Logging from within module function on driver")
    spark.range(100).rdd.map(map_sth).count()

if __name__ == "__main__":
    log1 = logging.getLogger("driver")
    log1.info("logging from module level")
    main()
Run Code Online (Sandbox Code Playgroud)

Mariusz的回答一样,日志可以使用资源管理器访问(或者当LOG_DIRS你的环境变量不在你的temp文件夹中时).添加在此脚本顶部完成的错误处理,以便您可以在本地运行此脚本.

这种方法允许更多的自由:您可以让执行程序记录到一个文件,并在另一个文件中的驱动器上记录所有类型的聚合.

请注意,与使用类作为内置日志记录模块的代理相比,在这种情况下还有更多工作要做,因为每次在执行程序实例上请求记录器时,都必须对其进行配置.在进行大数据分析时,这可能不是您的主要时间.;-)