气流 {logging_mixin.py:84} 警告 - --- 记录错误 -

Kad*_*Cho 5 logging encoding python-3.x airflow

我试图制作 mysqloperator dags,它从一个文件存储每个表的处理的文件中执行查询。

但我面临以下错误:

*** Reading local log.
[2018-07-17 10:42:53,897] {cli.py:374} INFO - Running on host joseongbins-MacBook-Air.local
[2018-07-17 10:42:53,913] {models.py:1197} INFO - Dependencies all met for <TaskInstance: dw_airflow_trial.dw_goods_md_info_period 2018-07-17 01:42:45.505791 [queued]>
[2018-07-17 10:42:53,916] {models.py:1197} INFO - Dependencies all met for <TaskInstance: dw_airflow_trial.dw_goods_md_info_period 2018-07-17 01:42:45.505791 [queued]>
[2018-07-17 10:42:53,916] {models.py:1407} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2018-07-17 10:42:53,925] {models.py:1428} INFO - Executing <Task(MySqlOperator): dw_goods_md_info_period> on 2018-07-17 01:42:45.505791
[2018-07-17 10:42:53,925] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run dw_airflow_trial dw_goods_md_info_period 2018-07-17T01:42:45.505791 --job_id 171 --raw -sd DAGS_FOLDER/dw_airflow.1.py']
[2018-07-17 10:42:54,660] {base_task_runner.py:98} INFO - Subtask: [2018-07-17 10:42:54,659] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-07-17 10:42:54,760] {base_task_runner.py:98} INFO - Subtask: [2018-07-17 10:42:54,759] {models.py:189} INFO - Filling up the DagBag from /Users/kadencho/airflow/dags/dw_airflow.1.py
[2018-07-17 10:42:54,866] {cli.py:374} INFO - Running on host joseongbins-MacBook-Air.local
[2018-07-17 10:42:54,882] {logging_mixin.py:84} WARNING - --- Logging error ---

[2018-07-17 10:42:54,884] {logging_mixin.py:84} WARNING - Traceback (most recent call last):

[2018-07-17 10:42:54,884] {logging_mixin.py:84} WARNING -   File "/Users/kadencho/anaconda3/lib/python3.6/logging/__init__.py", line 994, in emit
    stream.write(msg)

[2018-07-17 10:42:54,884] {logging_mixin.py:84} WARNING - UnicodeEncodeError: 'ascii' codec can't encode characters in position 157-158: ordinal not in range(128)

[2018-07-17 10:42:54,884] {logging_mixin.py:84} WARNING - Call stack:

[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING -   File "/Users/kadencho/anaconda3/bin/airflow", line 27, in <module>
    args.func(args)

[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING -   File "/Users/kadencho/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,

[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING -   File "/Users/kadencho/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)

[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING -   File "/Users/kadencho/anaconda3/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)

[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING -   File 
"/Users/kadencho/anaconda3/lib/python3.6/site- 
packages/airflow/operators/mysql_operator.py", line 49, in execute
    self.log.info('Executing: %s', self.sql)
Run Code Online (Sandbox Code Playgroud)

我使用并设置

  • mac os 上本地的气流网络服务器
  • 调度器
  • 默认 sqlalchemy 元数据库
  • 查询包括一些 unicode 字符

脚本文件如:

import os

from airflow import DAG
from airflow.operators.mysql_operator import MySqlOperator
from datetime import datetime, timedelta

DEFAULT_QUERY_PATH = '/Users/kadencho/git/data/tasks/dw/total/'
DEFAULT_DATABASE = ''

default_args = {
        'owner':'airflow',
        'depends_on_past':False,
        'start_date':datetime(2018, 7, 15),
        'email':['sbcho@.net'],
        'email_on_failure':True,
        'email_on_retry':False
        }


dag = DAG('dw_airflow_trial', schedule_interval="@once", default_args=default_args, max_active_runs=1)


def get_sql(fp, fn):
    with open(os.path.join(fp, fn), 'rb') as f:
        query = f.read().decode()
    return query


def get_mysql_operator(table_nm):
    operator_obj = MySqlOperator(
        task_id=table_nm,
        sql=get_sql(DEFAULT_QUERY_PATH, table_nm + '.sql'),
        database=DEFAULT_DATABASE,
        dag=dag,
        parameters={"charset":"utf-8"}
    )
    return operator_obj

dw_goods_md_info_period = get_mysql_operator('dw_goods_md_info_period')

dw_goods_set = get_mysql_operator('dw_goods_set')

dw_goods_md_info_period.set_downstream(dw_goods_set)
Run Code Online (Sandbox Code Playgroud)

基本配置如:

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /Users/kadencho/airflow

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /Users/kadencho/airflow/dags

# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /Users/kadencho/airflow/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id =
encrypt_s3_logs = False

# Logging level
logging_level = INFO

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =

# Log format
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = SequentialExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = sqlite:////Users/kadencho/airflow/airflow.db

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool.
sql_alchemy_pool_size = 5

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite.
sql_alchemy_pool_recycle = 3600
Run Code Online (Sandbox Code Playgroud)

我猜这是来自 metaDB 插入编码或日志编码。我已经考虑过

tob*_*bi6 0

尝试添加

# encoding=utf8
Run Code Online (Sandbox Code Playgroud)

到 DAG 的顶部。

来源