气流从数据库中获取数据并打印出来

dav*_*idb 0 airflow apache-airflow-xcom

我的任务是创建一个 POC,人们可以在其中从数据库中获取数据、处理它并将其发送到 S3。我仍在学习 Airflow,有些事情我还没有完全理解。我希望你能帮助我。所以我现在正在尝试从 mysql db 中获取数据并将其打印出来。问题是我总是得到“无”返回。我正在尝试使用 xcom 但它不起作用,我读到我不应该为此使用 xcom。这是我现在所拥有的:

def print_query():
query = ti.xcom_pull(task_ids='mysql_query')
print(query)

default_args = {
    'owner': 'me',
    'start_date': dt.datetime(2019, 8, 15),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

dag = DAG('s3_dag_test', default_args=default_args)

python = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_query,
    dag=dag)

query = MySqlOperator(
    task_id='mysql_query',
    sql='SELECT * FROM sakila.actor',
    mysql_conn_id='mysql_db',
    dag=dag)

query >> python
Run Code Online (Sandbox Code Playgroud)

第一个任务运行良好(mysql_query),但是第二个任务失败,因为我没有。怎么了?

另外,我得到的实际错误是这样的:

ERROR - print_query() got an unexpected keyword argument 'dag'
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?

amo*_*iov 6

要在你的 python 函数中使用上下文,你必须使用 kwargs 并将你的可调用对象定义为

def print_query(**kwargs):
Run Code Online (Sandbox Code Playgroud)

然后您可以将其用作:

def print_query(**kwargs):
    query = kwargs['ti'].xcom_pull(task_ids='mysql_query')
    print(query)
Run Code Online (Sandbox Code Playgroud)

或者您可以ti明确添加:

def print_query(ti, **kwargs):
    query = ti.xcom_pull(task_ids='mysql_query')
    print(query)
Run Code Online (Sandbox Code Playgroud)