小编Har*_*d17的帖子

Airflow 运行时的自定义操作员 XCom

我明白了,因为PythonOperator/BashOperator我们可以使用 Xcom 进行交流。

例如

def func(**context):
    context['task_instance'].xcom_pull()
Run Code Online (Sandbox Code Playgroud)

但是,我想知道如何xcom在运行时访问自定义运算符。

我的操作员看起来像这样:

class ECHOXOperator(BaseOperator):
    @apply_defaults
    def __init__(self, x, *args, **kwargs):
        self.x = x
        super(ECHOXOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print(self.x)
Run Code Online (Sandbox Code Playgroud)

所以在我的 DAG 中:

我可以

task2 = ECHOXOperator(x = 'Hello")
Run Code Online (Sandbox Code Playgroud)

而且效果很好。但是如何从上游任务访问 x 呢?

就像是:

def task1(**context):
    task_instance = context['task_instance']
    task_instance.xcom_push(key="x", value="Hello")

generate_data = PythonOperator(
    task_id="task1",
    python_callable=task1,
    dag=dag,
)

task2 = ECHOXOperator(x = task_instance.xcom_pull('task1', 'x'), provide_context=True)

task1 >> task2
Run Code Online (Sandbox Code Playgroud)

这不起作用,因为task_instanceinECHOXOperator未定义。

谢谢

airflow

3
推荐指数
1
解决办法
1620
查看次数

标签 统计

airflow ×1