dav*_*idb 0 airflow apache-airflow-xcom
我的任务是创建一个 POC,人们可以在其中从数据库中获取数据、处理它并将其发送到 S3。我仍在学习 Airflow,有些事情我还没有完全理解。我希望你能帮助我。所以我现在正在尝试从 mysql db 中获取数据并将其打印出来。问题是我总是得到“无”返回。我正在尝试使用 xcom 但它不起作用,我读到我不应该为此使用 xcom。这是我现在所拥有的:
def print_query():
query = ti.xcom_pull(task_ids='mysql_query')
print(query)
default_args = {
'owner': 'me',
'start_date': dt.datetime(2019, 8, 15),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
dag = DAG('s3_dag_test', default_args=default_args)
python = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_query,
dag=dag)
query = MySqlOperator(
task_id='mysql_query',
sql='SELECT * FROM sakila.actor',
mysql_conn_id='mysql_db',
dag=dag)
query >> python
Run Code Online (Sandbox Code Playgroud)
第一个任务运行良好(mysql_query),但是第二个任务失败,因为我没有。怎么了?
另外,我得到的实际错误是这样的:
ERROR - print_query() got an unexpected keyword argument 'dag'
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?
要在你的 python 函数中使用上下文,你必须使用 kwargs 并将你的可调用对象定义为
def print_query(**kwargs):
Run Code Online (Sandbox Code Playgroud)
然后您可以将其用作:
def print_query(**kwargs):
query = kwargs['ti'].xcom_pull(task_ids='mysql_query')
print(query)
Run Code Online (Sandbox Code Playgroud)
或者您可以ti明确添加:
def print_query(ti, **kwargs):
query = ti.xcom_pull(task_ids='mysql_query')
print(query)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1703 次 |
| 最近记录: |