如何从我的Python Spark脚本登录

W.P*_*ill 36 python logging apache-spark

我有一个我运行的Python Spark程序spark-submit.我想把日志语句放在其中.

logging.info("This is an informative message.")
logging.debug("This is a debug message.")
Run Code Online (Sandbox Code Playgroud)

我想使用Spark正在使用的相同记录器,以便日志消息以相同的格式出现,并且级别由相同的配置文件控制.我该怎么做呢?

我已经尝试将logging语句放在代码中并以a开头logging.getLogger().在这两种情况下,我都看到Spark的日志消息,但不是我的.我一直在关注Python日志记录文档,但还是无法从中找到它.

不确定这是否是提交给Spark的脚本特有的,或者我不了解日志记录的工作原理.

Ale*_*x Q 39

您可以从SparkContext对象获取记录器:

log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
Run Code Online (Sandbox Code Playgroud)

  • 这绝对允许我像Spark那样记录(谢谢!).有没有办法从SparkContext获取此记录器?在创建SparkContext之前,我有一些必须打印的日志 (4认同)
  • 我试图在PySpark中使用这个想法时遇到错误.我所做的是尝试将记录器存储为全局,然后当它不起作用时尝试将上下文本身存储为全局.我的用例是能够在foreach函数内的执行器上进行日志记录调用(它没有spark上下文)."例外:您似乎试图从广播变量,操作或转换中引用SparkContext.SparkContext只能用于驱动程序,而不能用于在工作程序上运行的代码.有关更多信息,请参阅SPARK-5063." (4认同)
  • 我遇到了问题:logger = logging.getLogger('py4j')TypeError:'JavaPackage'对象不可调用 (2认同)
  • @marlieg 在创建 spark 上下文之前,您无权访问 spark 日志记录。 (2认同)
  • 我可以正常工作,但无法弄清楚日志存储在哪里,有人可以帮助我吗 (2认同)

Cas*_*mon 16

您需要获取spark本身的记录器,默认情况下,getLogger()将返回您自己的模块的记录器.尝试类似的东西:

logger = logging.getLogger('py4j')
logger.info("My test info statement")
Run Code Online (Sandbox Code Playgroud)

它也可能是'pyspark'而不是'py4j'.

如果您在spark程序中使用的函数(以及执行某些日志记录)在与main函数相同的模块中定义,则会产生一些序列化错误.

这说明在这里,由同一人给出一个例子在这里

我也在spark 1.3.1上测试了这个

编辑:

要将日志记录从STDERR更改为STDOUT,您必须删除当前的StreamHandler并添加一个新的StreamHandler.

找到现有的流处理程序(完成后可以删除此行)

print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]
Run Code Online (Sandbox Code Playgroud)

可能只有一个,但如果不是,你将不得不更新位置.

logger.removeHandler(logger.handlers[0])
Run Code Online (Sandbox Code Playgroud)

为sys.stdout添加新的处理程序

import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
Run Code Online (Sandbox Code Playgroud)

  • 我很想回答这个问题,因为它对我不起作用.通过pyspark源代码,pyspark从不配置py4j记录器,而py4j使用java.utils.logging而不是spark使用的log4j记录器,所以我怀疑这会起作用.我认为这可能适用于主节点上的代码,但不适用于在worker上运行的任何代码. (4认同)
  • 这需要被否决。py4j 使用 java.utils.logging 而不是 log4j 记录器。鉴于这种方法,我无法让它发挥作用。 (3认同)

Pie*_*e D 6

就我而言,我很高兴将我的日志消息与通常的 Spark 日志消息一起添加到工作线程 stderr 中。

如果这满足您的需求,那么技巧是将特定的 Python 记录器重定向到stderr.

例如,受此答案启发,以下内容对我来说效果很好:

def getlogger(name, level=logging.INFO):
    import logging
    import sys

    logger = logging.getLogger(name)
    logger.setLevel(level)
    if logger.handlers:
        # or else, as I found out, we keep adding handlers and duplicate messages
        pass
    else:
        ch = logging.StreamHandler(sys.stderr)
        ch.setLevel(level)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        ch.setFormatter(formatter)
        logger.addHandler(ch)
    return logger
Run Code Online (Sandbox Code Playgroud)

用法:

def tst_log():
    logger = getlogger('my-worker')
    logger.debug('a')
    logger.info('b')
    logger.warning('c')
    logger.error('d')
    logger.critical('e')
    ...
Run Code Online (Sandbox Code Playgroud)

输出(加上一些周围的上下文行):

17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver
Run Code Online (Sandbox Code Playgroud)


vy3*_*y32 5

我们需要从执行者而不是驱动程序节点登录。因此,我们执行了以下操作:

  1. 我们/etc/rsyslog.d/spark.conf在所有节点上创建了一个(使用Bootstrap方法,将Amazon Elastic Map Reduce so that the Core nodes forwarded sysloglocal1`消息发送到主节点。

  2. 在主节点上,我们启用了UDP和TCP syslog侦听器,并将其设置为使所有local消息都记录到/var/log/local1.log

  3. 我们logging在地图函数中创建了一个Python 模块Syslog记录器。

  4. 现在我们可以使用登录logging.info()。...

我们发现的一件事是,同一分区正在多个执行器上同时处理。显然,只要有额外的资源,Spark就会一直这样做。当执行者被神秘地延迟或失败时,这可以处理这种情况。

登录map功能使我们了解了很多Spark的工作原理。