Mat*_*ett 6 python logging airflow python-jsonschema
我需要记录Apache AirflowJSON 格式日志记录到标准输出。Airflow 似乎没有开箱即用地投射此功能。我找到了几个能够完成此任务的 python 模块,但我无法使实现工作。
目前,我正在应用一个类airflow/utils/logging.py来修改记录器,如下所示:
from pythonjsonlogger import jsonlogger
class StackdriverJsonFormatter(jsonlogger.JsonFormatter, object):
def __init__(self, fmt="%(levelname) %(asctime) %(nanotime) %(severity) %(message)", style='%', *args, **kwargs):
jsonlogger.JsonFormatter.__init__(self, fmt=fmt, *args, **kwargs)
def process_log_record(self, log_record):
if log_record.get('level'):
log_record['severity'] = log_record['level']
del log_record['level']
else:
log_record['severity'] = log_record['levelname']
del log_record['levelname']
if log_record.get('asctime'):
log_record['timestamp'] = log_record['asctime']
del log_record['asctime']
now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['nanotime'] = now
return super(StackdriverJsonFormatter, self).process_log_record(log_record)
Run Code Online (Sandbox Code Playgroud)
我正在执行此代码,/airflow/settings.py如下所示:
from airflow.utils import logging as logconf
def configure_logging(log_format=LOG_FORMAT):
handler = logconf.logging.StreamHandler(sys.stdout)
formatter = logconf.StackdriverJsonFormatter()
handler.setFormatter(formatter)
logging = logconf.logging.getLogger()
logging.addHandler(handler)
''' code below was original airflow source code
logging.root.handlers = []
logging.basicConfig(
format=log_format, stream=sys.stdout, level=LOGGING_LEVEL)
'''
Run Code Online (Sandbox Code Playgroud)
我已经尝试了几个不同的变体,但无法让 python-json-logger 将日志转换为 JSON。也许我没有进入根记录器?我考虑过的另一个选择是手动将日志格式化为 JSON 字符串。也没有运气。任何替代想法、技巧或支持表示赞赏。
干杯!
我不知道你是否解决了这个问题,但经过一些令人沮丧的修补之后,我最终让它与气流很好地配合。作为参考,我遵循了本文的大部分内容以使其正常工作: https: //www.astronomer.io/guides/logging/。主要问题是气流日志记录仅接受日志记录格式的字符串模板,而 json-logging 无法插入该字符串模板。因此,您必须创建自己的日志记录类并将其连接到自定义日志记录配置类。
将此处的日志模板复制到您的$AIRFLOW_HOME/config文件夹中,并更改DEFAULT_CONFIG_LOGGING为CONFIG_LOGGING. 成功后,启动气流,您将在气流启动时收到一条日志消息,其中显示Successfully imported user-defined logging config from logging_config.LOGGING_CONFIG。如果这是 config 文件夹中的第一个 .py 文件,请不要忘记添加一个空白__init__.py文件以使 python 拾取它
编写自定义 JsonFormatter 以注入到您的处理程序中。我用这个做了我的。
编写自定义日志处理程序类。因为我正在寻找 JSON 日志记录,所以我的日志看起来像这样:
from airflow.utils.log.file_processor_handler import FileProcessorHandler
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import RedirectStdHandler
from pythonjsonlogger import jsonlogger
class JsonStreamHandler(RedirectStdHandler):
def __init__(self, stream):
super(JsonStreamHandler, self).__init__(stream)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonFileTaskHandler(FileTaskHandler):
def __init__(self, base_log_folder, filename_template):
super(JsonFileTaskHandler, self).__init__(base_log_folder, filename_template)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonFileProcessorHandler(FileProcessorHandler):
def __init__(self, base_log_folder, filename_template):
super(JsonFileProcessorHandler, self).__init__(base_log_folder, filename_template)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonRotatingFileHandler(RotatingFileHandler):
def __init__(self, filename, mode, maxBytes, backupCount):
super(JsonRotatingFileHandler, self).__init__(filename, mode, maxBytes, backupCount)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
Run Code Online (Sandbox Code Playgroud)
'handlers': {
'console': {
'class': 'logging_handler.JsonStreamHandler',
'stream': 'sys.stdout'
},
'task': {
'class': 'logging_handler.JsonFileTaskHandler',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'logging_handler.JsonFileProcessorHandler',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
}
...
Run Code Online (Sandbox Code Playgroud)
和
DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
'handlers': {
'processor_manager': {
'class': 'logging_handler.JsonRotatingFileHandler',
'formatter': 'airflow',
'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
'mode': 'a',
'maxBytes': 104857600, # 100MB
'backupCount': 5
}
}
...
Run Code Online (Sandbox Code Playgroud)
并且应该输出json日志,无论是在DAG日志中还是在输出中。
希望这可以帮助!
| 归档时间: |
|
| 查看次数: |
2841 次 |
| 最近记录: |