删除Airflow任务日志

jom*_*mpa 22 airflow

我正在运行5个DAG,它们在base_log_folder一个月内生成了大约6GB的日志数据.我只是添加了一个,remote_base_log_folder但似乎它不排除登录到base_log_folder.

是否有自动删除旧日志文件,旋转它们或强制气流不能仅在远程存储中登录磁盘(base_log_folder)?

Vin*_*ala 14

请参阅https://github.com/teamclairvoyant/airflow-maintenance-dags

这个插件有DAG可以杀死暂停的任务和日志清理.您可以获取概念,并可以根据您的要求提供可以清理的新DAG.


Fra*_*nzi 14

我们通过实现我们自己的来删除任务日志FileTaskHandler,然后在airflow.cfg. 因此,我们覆盖默认的 LogHandler 以仅保留 N 个任务日志,而不安排额外的 DAG。

我们正在使用Airflow==1.10.1.

[core]
logging_config_class = log_config.LOGGING_CONFIG
Run Code Online (Sandbox Code Playgroud)

log_config.LOGGING_CONFIG

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
FOLDER_TASK_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}'
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'

LOGGING_CONFIG = {
    'formatters': {},
    'handlers': {
        '...': {},
        'task': {
            'class': 'file_task_handler.FileTaskRotationHandler',
            'formatter': 'airflow.job',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
            'folder_task_template': FOLDER_TASK_TEMPLATE,
            'retention': 20
        },
        '...': {}
    },
    'loggers': {
        'airflow.task': {
            'handlers': ['task'],
            'level': JOB_LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        '...': {}
    }
}
Run Code Online (Sandbox Code Playgroud)

file_task_handler.FileTaskRotationHandler

import os
import shutil

from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler


class FileTaskRotationHandler(FileTaskHandler):

    def __init__(self, base_log_folder, filename_template, folder_task_template, retention):
        """
        :param base_log_folder: Base log folder to place logs.
        :param filename_template: template filename string.
        :param folder_task_template: template folder task path.
        :param retention: Number of folder logs to keep
        """
        super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template)
        self.retention = retention
        self.folder_task_template, self.folder_task_template_jinja_template = \
            parse_template_string(folder_task_template)

    @staticmethod
    def _get_directories(path='.'):
        return next(os.walk(path))[1]

    def _render_folder_task_path(self, ti):
        if self.folder_task_template_jinja_template:
            jinja_context = ti.get_template_context()
            return self.folder_task_template_jinja_template.render(**jinja_context)

        return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id)

    def _init_file(self, ti):
        relative_path = self._render_folder_task_path(ti)
        folder_task_path = os.path.join(self.local_base, relative_path)
        subfolders = self._get_directories(folder_task_path)
        to_remove = set(subfolders) - set(subfolders[-self.retention:])

        for dir_to_remove in to_remove:
            full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove)
            print('Removing', full_dir_to_remove)
            shutil.rmtree(full_dir_to_remove)

        return FileTaskHandler._init_file(self, ti)
Run Code Online (Sandbox Code Playgroud)


zho*_*jie 6

Airflow维护者不认为截断日志是Airflow核心逻辑的一部分,看到这一点,然后在这个问题上,维护者建议更改LOG_LEVEL以避免太多日志数据。

这个PR 中,我们可以学习如何更改airflow.cfg.

祝你好运。

  • 那是因为气流维持者很懒 (11认同)

non*_*ono -1

我不认为存在轮换机制,但您可以将它们存储在 S3 或谷歌云存储中,如下所述: https: //airflow.incubator.apache.org/configuration.html#logs

  • 谢谢,我正在使用远程日志记录选项和 `remote_base_log_folder` 将日志文件存储在 GCS 上。这会将日志添加到 GCS,但不会在本地删除它们。我想我想知道其他人如何处理生产中填满磁盘的大量日志文件。 (15认同)