我一直在寻找关于这个主题的各种答案,但一直未能找到有效的解决方案.
我有气流设置到Log to s3但UI似乎只使用基于文件的任务处理程序而不是指定的S3.
我有s3连接设置如下
Conn_id = my_conn_S3
Conn_type = S3
Extra = {"region_name": "us-east-1"}
Run Code Online (Sandbox Code Playgroud)
(ECS实例使用具有完全s3权限的角色)
我还创建了一个log_config文件,其中包含以下设置
remote_log_conn_id = my_conn_S3
encrypt_s3_logs = False
logging_config_class = log_config.LOGGING_CONFIG
task_log_reader = s3.task
Run Code Online (Sandbox Code Playgroud)
在我的日志配置中,我有以下设置
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
S3_LOG_FOLDER = 's3://data-team-airflow-logs/airflow-master-tester/'
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
# When using s3 or gcs, provide a customized LOGGING_CONFIG
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
# for details
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Run Code Online (Sandbox Code Playgroud)
我可以在S3上看到日志,但是当我导航到UI日志时,我得到的就是
*** Log file isn't local.
*** Fetching here: http://1eb84d89b723:8793/log/hermes_pull_double_click_click/hermes_pull_double_click_click/2018-02-26T11:22:00/1.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='1eb84d89b723', port=8793): Max retries exceeded with url: /log/hermes_pull_double_click_click/hermes_pull_double_click_click/2018-02-26T11:22:00/1.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fe6940fc048>: Failed to establish a new connection: [Errno -2] Name or service not known',))
Run Code Online (Sandbox Code Playgroud)
我可以在日志中看到它成功导入了log_config.py(我还包括了一个init .py)
无法理解为什么它在这里使用FileTaskHandler而不是S3
任何帮助都会很棒,谢谢
在我的情况下,这里的问题不是气流。
我能够去 gitter 频道和那里的人交谈。
将打印语句放入正在运行的 python 代码后,我能够在这行代码中捕获异常。
例外是 SSLContext 上的一个回避最大深度问题,在网上环顾四周后,这似乎来自使用 gevent 与 unicorn 的某种组合。
https://github.com/gevent/gevent/issues/903
我将其切换回同步,并且不得不将 AWS ELB 侦听器更改为 TCP,但之后日志通过 UI 工作正常
希望这对其他人有帮助。