Airflow不会将日志写入s3

Yan*_*SSE 5 amazon-s3 airflow

我尝试了不同的方法配置Airflow 1.9将日志写入s3,但它只是忽略它.我发现很多人在这样做之后阅读日志时遇到了问题,但我的问题是日志仍然是本地的.我可以毫无问题地阅读它们,但它们不在指定的s3存储桶中.

我尝试的是首先写入airflow.cfg文件

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False
Run Code Online (Sandbox Code Playgroud)

然后我尝试设置环境变量

AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
Run Code Online (Sandbox Code Playgroud)

但是它会被忽略,日志文件仍然是本地的.

我从一个容器运行气流,我调整了https://github.com/puckel/docker-airflow到我的情况,但它不会将日志写入s3.我使用aws连接写入dags中的存储桶,这可以工作,但无论是在EC2上还是在我的机器上本地运行,Logs都保持在本地状态.

Yan*_*mer 5

我终于使用/sf/answers/3427859501/找到了一个答案 ,这是我接下来要做的大部分工作。我在这里重现这个答案并按照我的方式进行调整:

要检查的一些事项:

  1. 确保您拥有该log_config.py文件并且它位于正确的目录中:./config/log_config.py.
  2. 确保您没有忘记该__init__.py目录中的文件。
  3. 确保您定义了s3.task处理程序并将其格式化程序设置为airflow.task
  4. 确保将airflow.task 和airflow.task_runner 处理程序设置为s3.task
  5. 设置task_log_reader = s3.taskairflow.cfg
  6. 传递S3_LOG_FOLDERlog_config. 我是使用一个变量来实现的,并如下检索它log_config.py

这是一个有效的 log_config.py:

import os

from airflow import configuration as conf


LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
       's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,这种方式S3_LOG_FOLDER可以airflow.cfg在变量中指定或作为环境变量AIRFLOW__CORE__S3_LOG_FOLDER