Aar*_*ron 5 python mysql airflow apache-airflow
这个顺序:
from airflow.hooks.mysql_hook import MySqlHook
conn = MySqlHook(mysql_conn_id='conn_id')
engine = conn.get_sqlalchemy_engine()
df.to_sql('test_table', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)
产生以下内容:
UnicodeEncodeError: 'latin-1' codec can't encode character '\ufffd' in position 57: ordinal not in range(256)
Run Code Online (Sandbox Code Playgroud)
这个序列效果很好:
from sqlalchemy import create_engine
engine = create_engine("mysql://{0}:{1}@{2}/capone?charset=utf8".format(user, pwd, host))
df.to_sql('test_table', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)
关键在于明确声明charset. 我试图在气流中做到这一点,如下所示{"charset": "utf8"}:
但这并没有修复错误。自从进行更改后,我已经重新启动了我的开发环境,并且管理面板让我知道编辑成功。如何将 Airflow 连接到我的字符集作为 utf8?
我意识到这是 Airflow 中的一个错误,我已在这里报告:https ://issues.apache.org/jira/browse/AIRFLOW-4824
现在我有一个解决方法,代码如下:
def get_uri(hook):
conn = hook.get_connection(getattr(hook, hook.conn_name_attr))
login = ''
if conn.login:
login = '{conn.login}:{conn.password}@'.format(conn=conn)
host = conn.host
if conn.port is not None:
host += ':{port}'.format(port=conn.port)
charset = ''
if conn.extra_dejson.get('charset', False):
chrs = conn.extra_dejson["charset"]
if chrs.lower() == 'utf8' or chrs.lower() == 'utf-8':
charset = '?charset=utf8'
return '{conn.conn_type}://{login}{host}/{conn.schema}{charset}'.format(
conn=conn, login=login, host=host, charset=charset)
Run Code Online (Sandbox Code Playgroud)
然后使用它如下:
url = get_uri(sql_hook)
from sqlalchemy import create_engine
engine = create_engine(url)
Run Code Online (Sandbox Code Playgroud)
真正的解决方案是将拉取请求发送到覆盖 mysql_hook.py 中的 get_uri 的项目。
| 归档时间: |
|
| 查看次数: |
2718 次 |
| 最近记录: |