W.P*_*ill 36 python logging apache-spark
我有一个我运行的Python Spark程序spark-submit.我想把日志语句放在其中.
logging.info("This is an informative message.")
logging.debug("This is a debug message.")
Run Code Online (Sandbox Code Playgroud)
我想使用Spark正在使用的相同记录器,以便日志消息以相同的格式出现,并且级别由相同的配置文件控制.我该怎么做呢?
我已经尝试将logging语句放在代码中并以a开头logging.getLogger().在这两种情况下,我都看到Spark的日志消息,但不是我的.我一直在关注Python日志记录文档,但还是无法从中找到它.
不确定这是否是提交给Spark的脚本特有的,或者我不了解日志记录的工作原理.
Ale*_*x Q 39
您可以从SparkContext对象获取记录器:
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
Run Code Online (Sandbox Code Playgroud)
Cas*_*mon 16
您需要获取spark本身的记录器,默认情况下,getLogger()将返回您自己的模块的记录器.尝试类似的东西:
logger = logging.getLogger('py4j')
logger.info("My test info statement")
Run Code Online (Sandbox Code Playgroud)
它也可能是'pyspark'而不是'py4j'.
如果您在spark程序中使用的函数(以及执行某些日志记录)在与main函数相同的模块中定义,则会产生一些序列化错误.
我也在spark 1.3.1上测试了这个
编辑:
要将日志记录从STDERR更改为STDOUT,您必须删除当前的StreamHandler并添加一个新的StreamHandler.
找到现有的流处理程序(完成后可以删除此行)
print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]
Run Code Online (Sandbox Code Playgroud)
可能只有一个,但如果不是,你将不得不更新位置.
logger.removeHandler(logger.handlers[0])
Run Code Online (Sandbox Code Playgroud)
为sys.stdout添加新的处理程序
import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
Run Code Online (Sandbox Code Playgroud)
就我而言,我很高兴将我的日志消息与通常的 Spark 日志消息一起添加到工作线程 stderr 中。
如果这满足您的需求,那么技巧是将特定的 Python 记录器重定向到stderr.
例如,受此答案启发,以下内容对我来说效果很好:
def getlogger(name, level=logging.INFO):
import logging
import sys
logger = logging.getLogger(name)
logger.setLevel(level)
if logger.handlers:
# or else, as I found out, we keep adding handlers and duplicate messages
pass
else:
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
Run Code Online (Sandbox Code Playgroud)
用法:
def tst_log():
logger = getlogger('my-worker')
logger.debug('a')
logger.info('b')
logger.warning('c')
logger.error('d')
logger.critical('e')
...
Run Code Online (Sandbox Code Playgroud)
输出(加上一些周围的上下文行):
17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver
Run Code Online (Sandbox Code Playgroud)
我们需要从执行者而不是驱动程序节点登录。因此,我们执行了以下操作:
我们/etc/rsyslog.d/spark.conf在所有节点上创建了一个(使用Bootstrap方法,将Amazon Elastic Map Reduce so that the Core nodes forwarded sysloglocal1`消息发送到主节点。
在主节点上,我们启用了UDP和TCP syslog侦听器,并将其设置为使所有local消息都记录到/var/log/local1.log。
我们logging在地图函数中创建了一个Python 模块Syslog记录器。
现在我们可以使用登录logging.info()。...
我们发现的一件事是,同一分区正在多个执行器上同时处理。显然,只要有额外的资源,Spark就会一直这样做。当执行者被神秘地延迟或失败时,这可以处理这种情况。
登录map功能使我们了解了很多Spark的工作原理。