我明白了,因为PythonOperator/BashOperator我们可以使用 Xcom 进行交流。
例如
def func(**context):
context['task_instance'].xcom_pull()
Run Code Online (Sandbox Code Playgroud)
但是,我想知道如何xcom在运行时访问自定义运算符。
我的操作员看起来像这样:
class ECHOXOperator(BaseOperator):
@apply_defaults
def __init__(self, x, *args, **kwargs):
self.x = x
super(ECHOXOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print(self.x)
Run Code Online (Sandbox Code Playgroud)
所以在我的 DAG 中:
我可以
task2 = ECHOXOperator(x = 'Hello")
Run Code Online (Sandbox Code Playgroud)
而且效果很好。但是如何从上游任务访问 x 呢?
就像是:
def task1(**context):
task_instance = context['task_instance']
task_instance.xcom_push(key="x", value="Hello")
generate_data = PythonOperator(
task_id="task1",
python_callable=task1,
dag=dag,
)
task2 = ECHOXOperator(x = task_instance.xcom_pull('task1', 'x'), provide_context=True)
task1 >> task2
Run Code Online (Sandbox Code Playgroud)
这不起作用,因为task_instanceinECHOXOperator未定义。
谢谢
airflow ×1