如何为 Airflow 连接显式声明 charset=utf8

Aar*_*ron 5 python mysql airflow apache-airflow

这个顺序:

from airflow.hooks.mysql_hook import MySqlHook
conn = MySqlHook(mysql_conn_id='conn_id')
engine = conn.get_sqlalchemy_engine()
df.to_sql('test_table', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)

产生以下内容:

UnicodeEncodeError: 'latin-1' codec can't encode character '\ufffd' in position 57: ordinal not in range(256)
Run Code Online (Sandbox Code Playgroud)

这个序列效果很好:

from sqlalchemy import create_engine
engine = create_engine("mysql://{0}:{1}@{2}/capone?charset=utf8".format(user, pwd, host))
df.to_sql('test_table', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)

关键在于明确声明charset. 我试图在气流中做到这一点,如下所示{"charset": "utf8"}

在此处输入图片说明

但这并没有修复错误。自从进行更改后,我已经重新启动了我的开发环境,并且管理面板让我知道编辑成功。如何将 Airflow 连接到我的字符集作为 utf8?

lol*_*ode 2

我意识到这是 Airflow 中的一个错误,我已在这里报告:https ://issues.apache.org/jira/browse/AIRFLOW-4824

现在我有一个解决方法,代码如下:

def get_uri(hook):
    conn = hook.get_connection(getattr(hook, hook.conn_name_attr))
    login = ''
    if conn.login:
        login = '{conn.login}:{conn.password}@'.format(conn=conn)
    host = conn.host
    if conn.port is not None:
        host += ':{port}'.format(port=conn.port)
    charset = ''
    if conn.extra_dejson.get('charset', False):
        chrs = conn.extra_dejson["charset"]
        if chrs.lower() == 'utf8' or chrs.lower() == 'utf-8':
            charset = '?charset=utf8'
    return '{conn.conn_type}://{login}{host}/{conn.schema}{charset}'.format(
        conn=conn, login=login, host=host, charset=charset)
Run Code Online (Sandbox Code Playgroud)

然后使用它如下:

url = get_uri(sql_hook)
from sqlalchemy import create_engine
engine = create_engine(url)
Run Code Online (Sandbox Code Playgroud)

真正的解决方案是将拉取请求发送到覆盖 mysql_hook.py 中的 get_uri 的项目。