Airflow 运行时的自定义操作员 XCom

Har*_*d17 3 airflow

我明白了,因为PythonOperator/BashOperator我们可以使用 Xcom 进行交流。

例如

def func(**context):
    context['task_instance'].xcom_pull()
Run Code Online (Sandbox Code Playgroud)

但是,我想知道如何xcom在运行时访问自定义运算符。

我的操作员看起来像这样:

class ECHOXOperator(BaseOperator):
    @apply_defaults
    def __init__(self, x, *args, **kwargs):
        self.x = x
        super(ECHOXOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print(self.x)
Run Code Online (Sandbox Code Playgroud)

所以在我的 DAG 中:

我可以

task2 = ECHOXOperator(x = 'Hello")
Run Code Online (Sandbox Code Playgroud)

而且效果很好。但是如何从上游任务访问 x 呢?

就像是:

def task1(**context):
    task_instance = context['task_instance']
    task_instance.xcom_push(key="x", value="Hello")

generate_data = PythonOperator(
    task_id="task1",
    python_callable=task1,
    dag=dag,
)

task2 = ECHOXOperator(x = task_instance.xcom_pull('task1', 'x'), provide_context=True)

task1 >> task2
Run Code Online (Sandbox Code Playgroud)

这不起作用,因为task_instanceinECHOXOperator未定义。

谢谢

kax*_*xil 5

您应该在自定义运算符中x作为templated_fields传递。

class ECHOXOperator(BaseOperator):
    template_fields = ['x']

    @apply_defaults
    def __init__(self, x, *args, **kwargs):
        self.x = x
        super(ECHOXOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print(self.x)
Run Code Online (Sandbox Code Playgroud)

现在您可以执行以下操作来获取上一个任务中传递的 Xcom 值:

def task1(**context):
    task_instance = context['task_instance']
    task_instance.xcom_push(key="x", value="Hello")

generate_data = PythonOperator(
    task_id="task1",
    python_callable=task1,
    dag=dag,
)

task2 = ECHOXOperator(x = "{{ ti.xcom_pull('task1', 'x') }}")

task1 >> task2
Run Code Online (Sandbox Code Playgroud)

有关templated_fields和 Jinja 模板的更多信息:https://airflow.readthedocs.io/en/latest/concepts.html#id1