Kad*_*Cho 5 logging encoding python-3.x airflow
我试图制作 mysqloperator dags,它从一个文件存储每个表的处理的文件中执行查询。
但我面临以下错误:
*** Reading local log.
[2018-07-17 10:42:53,897] {cli.py:374} INFO - Running on host joseongbins-MacBook-Air.local
[2018-07-17 10:42:53,913] {models.py:1197} INFO - Dependencies all met for <TaskInstance: dw_airflow_trial.dw_goods_md_info_period 2018-07-17 01:42:45.505791 [queued]>
[2018-07-17 10:42:53,916] {models.py:1197} INFO - Dependencies all met for <TaskInstance: dw_airflow_trial.dw_goods_md_info_period 2018-07-17 01:42:45.505791 [queued]>
[2018-07-17 10:42:53,916] {models.py:1407} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2018-07-17 10:42:53,925] {models.py:1428} INFO - Executing <Task(MySqlOperator): dw_goods_md_info_period> on 2018-07-17 01:42:45.505791
[2018-07-17 10:42:53,925] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run dw_airflow_trial dw_goods_md_info_period 2018-07-17T01:42:45.505791 --job_id 171 --raw -sd DAGS_FOLDER/dw_airflow.1.py']
[2018-07-17 10:42:54,660] {base_task_runner.py:98} INFO - Subtask: [2018-07-17 10:42:54,659] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-07-17 10:42:54,760] {base_task_runner.py:98} INFO - Subtask: [2018-07-17 10:42:54,759] {models.py:189} INFO - Filling up the DagBag from /Users/kadencho/airflow/dags/dw_airflow.1.py
[2018-07-17 10:42:54,866] {cli.py:374} INFO - Running on host joseongbins-MacBook-Air.local
[2018-07-17 10:42:54,882] {logging_mixin.py:84} WARNING - --- Logging error ---
[2018-07-17 10:42:54,884] {logging_mixin.py:84} WARNING - Traceback (most recent call last):
[2018-07-17 10:42:54,884] {logging_mixin.py:84} WARNING - File "/Users/kadencho/anaconda3/lib/python3.6/logging/__init__.py", line 994, in emit
stream.write(msg)
[2018-07-17 10:42:54,884] {logging_mixin.py:84} WARNING - UnicodeEncodeError: 'ascii' codec can't encode characters in position 157-158: ordinal not in range(128)
[2018-07-17 10:42:54,884] {logging_mixin.py:84} WARNING - Call stack:
[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING - File "/Users/kadencho/anaconda3/bin/airflow", line 27, in <module>
args.func(args)
[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING - File "/Users/kadencho/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING - File "/Users/kadencho/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING - File "/Users/kadencho/anaconda3/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
[2018-07-17 10:42:54,888] {logging_mixin.py:84} WARNING - File
"/Users/kadencho/anaconda3/lib/python3.6/site-
packages/airflow/operators/mysql_operator.py", line 49, in execute
self.log.info('Executing: %s', self.sql)
Run Code Online (Sandbox Code Playgroud)
我使用并设置
脚本文件如:
import os
from airflow import DAG
from airflow.operators.mysql_operator import MySqlOperator
from datetime import datetime, timedelta
DEFAULT_QUERY_PATH = '/Users/kadencho/git/data/tasks/dw/total/'
DEFAULT_DATABASE = ''
default_args = {
'owner':'airflow',
'depends_on_past':False,
'start_date':datetime(2018, 7, 15),
'email':['sbcho@.net'],
'email_on_failure':True,
'email_on_retry':False
}
dag = DAG('dw_airflow_trial', schedule_interval="@once", default_args=default_args, max_active_runs=1)
def get_sql(fp, fn):
with open(os.path.join(fp, fn), 'rb') as f:
query = f.read().decode()
return query
def get_mysql_operator(table_nm):
operator_obj = MySqlOperator(
task_id=table_nm,
sql=get_sql(DEFAULT_QUERY_PATH, table_nm + '.sql'),
database=DEFAULT_DATABASE,
dag=dag,
parameters={"charset":"utf-8"}
)
return operator_obj
dw_goods_md_info_period = get_mysql_operator('dw_goods_md_info_period')
dw_goods_set = get_mysql_operator('dw_goods_set')
dw_goods_md_info_period.set_downstream(dw_goods_set)
Run Code Online (Sandbox Code Playgroud)
基本配置如:
[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /Users/kadencho/airflow
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /Users/kadencho/airflow/dags
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /Users/kadencho/airflow/logs
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id =
encrypt_s3_logs = False
# Logging level
logging_level = INFO
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =
# Log format
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = SequentialExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = sqlite:////Users/kadencho/airflow/airflow.db
# The SqlAlchemy pool size is the maximum number of database connections
# in the pool.
sql_alchemy_pool_size = 5
# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite.
sql_alchemy_pool_recycle = 3600
Run Code Online (Sandbox Code Playgroud)
我猜这是来自 metaDB 插入编码或日志编码。我已经考虑过
| 归档时间: |
|
| 查看次数: |
1637 次 |
| 最近记录: |