Airflow:如何从 BigQueryOperator 推动 xcom 价值?

Pro*_*120 5 airflow

这是我的操作员:

bigquery_check_op = BigQueryOperator(
    task_id='bigquery_check',
    bql=SQL_QUERY,
    use_legacy_sql = False,
    bigquery_conn_id=CONNECTION_ID,
    trigger_rule='all_success',
    xcom_push=True,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

当我检查 UI 中的渲染页面时。那里什么也没有出现。当我在控制台中运行 SQL 时,它返回1400正确的值。为什么运营商不推XCOM?

我无法使用BigQueryValueCheckOperator。该运算符被设计为在值检查时失败。我不想让任何事情失败。我只是想根据查询的返回值对代码进行分支。

dla*_*lin 1

最简单的答案是因为不是也不是 LoggingMixinxcom_push中的参数之一。BigQueryOperatorBaseOperator

BigQueryGetDataOperator确实返回(并因此推送)一些数据,但它是按表和列名称工作的。您可以通过将运行的查询输出到唯一命名的表(可能{{ds_nodash}}在名称中使用)来链接此行为,然后使用该表作为此运算符的源,然后可以使用BranchPythonOperator.

您可能会尝试使用来运行BigQueryHook查询get_conn().cursor()使用.BranchPythonOperator

在其他地方,我们聊天并想出了一些类似的东西来放入 a 的可调用部分BranchPythonOperator

cursor = BigQueryHook(bigquery_conn_id='connection_name').get_conn().cursor()
# one of these two:
cursor.execute(SQL_QUERY)  # if non-legacy
cursor.job_id = cursor.run_query(bql=SQL_QUERY, use_legacy_sql=False)  # if legacy
result=cursor.fetchone()
return "task_one" if result[0] is 1400 else "task_two"  # depends on results format
Run Code Online (Sandbox Code Playgroud)