如何在 Airflow 中将 MySqlOperator 与 xcom 一起使用?

Pro*_*120 3 airflow

我读过这篇How to use airflow xcoms with MySqlOperator,虽然它有一个类似的标题,但它并没有真正解决我的问题。

我有以下代码:

def branch_func_is_new_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    logging.info(string_to_print)
    if int(xcom) > int(LAST_IMPORTED_ORDER_ID)
        return 'import_orders'
    else:
        return 'skip_operation'

query_get_max_order_id  = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = MySqlOperator(
        task_id='query_get_max_order_id',
        sql= query_get_max_order_id,
        mysql_conn_id=MyCon,
        xcom_push=True,
        dag=dag)

branch_op_is_new_records = BranchPythonOperator(
    task_id='branch_operation_is_new_records',
    provide_context=True,
    python_callable=branch_func_is_new_records,
    dag=dag)

get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
Run Code Online (Sandbox Code Playgroud)

MySqlOperator根据选择下一个任务的编号返回一个编号BranchPythonOperator。保证MySqlOperator返回的值大于0

我的问题是,当我转到“我什么也没看到”时,“在用户界面上”没有推送XCOM任何MySqlOperator 内容XCOM。显然BranchPythonOperator什么也没读到,所以我的代码失败了。

为什么XCOM这里不起作用?

Ash*_*lor 5

目前,MySQL 运算符(撰写本文时为airflow 1.10.0)不支持在 XCom 中返回任何内容,因此目前的修复方法是自己编写一个小型运算符。您可以直接在 DAG 文件中执行此操作(未经测试,因此可能会出现愚蠢的错误):

from airflow.operators.mysql_operator import MySqlOperator as BaseMySqlOperator
from airflow.hooks.mysql_hook import MySqlHook

class ReturningMySqlOperator(BaseMySqlOperator):
    def execute(self, context):
        self.log.info('Executing: %s', self.sql)
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        return hook.get_first(
            self.sql,
            parameters=self.parameters)


def branch_func_is_new_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    logging.info(string_to_print)
    if str(xcom) == 'NewRecords':
        return 'import_orders'
    else:
        return 'skip_operation'

query_get_max_order_id  = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = ReturningMySqlOperator(
        task_id='query_get_max_order_id',
        sql= query_get_max_order_id,
        mysql_conn_id=MyCon,
        # xcom_push=True,
        dag=dag)

branch_op_is_new_records = BranchPythonOperator(
    task_id='branch_operation_is_new_records',
    provide_context=True,
    python_callable=branch_func_is_new_records,
    dag=dag)

get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
Run Code Online (Sandbox Code Playgroud)