我可以在 Airflow 脚本的 MAIN 部分(在 PythonOperator 之外)获取()或 xcom.pull()一个变量吗?

BPS*_*BPS 6 python airflow

我有一种情况,我需要在 S3 中找到一个特定的文件夹才能在 Airflow 脚本中传递给 PythonOperator。我正在使用另一个找到正确目录的 PythonOperator 来执行此操作。我可以成功地使用 xcom.push() 或 Variable.set() 并PythonOperator 中读回它。问题是,我需要将此变量传递给使用 Python 库中代码的单独 PythonOperator。因此,我需要在 Airflow 脚本的主要部分中使用 Variable.get() 或 xcom.pull() 这个变量。我已经搜索了很多,似乎无法弄清楚这是否可能。下面是一些代码供参考:

    def check_for_done_file(**kwargs):

    ### This function does a bunch of stuff to find the correct S3 path to 
    ### populate target_dir, this has been verified and works

    Variable.set("target_dir", done_file_list.pop())
    test = Variable.get("target_dir")
    print("TEST: ", test)

    #### END OF METHOD, BEGIN MAIN

with my_dag:

   ### CALLING METHOD FROM MAIN, POPULATING VARIABLE

   check_for_done_file_task = PythonOperator(
      task_id = 'check_for_done_file',
      python_callable = check_for_done_file,
      dag = my_dag,
      op_kwargs = {
          "source_bucket" : "my_source_bucket",
          "source_path" : "path/to/the/s3/folder/I/need"
      }
   )

   target_dir = Variable.get("target_dir") # I NEED THIS VAR HERE.

   move_data_to_in_progress_task = PythonOperator(
       task_id = 'move-from-incoming-to-in-progress',
       python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
       dag = my_dag,
       op_kwargs = {
           "source_bucket" : "source_bucket",
           "source_path" : "path/to/my/s3/folder/" + target_dir,
           "destination_bucket" : "destination_bucket",
           "destination_path" : "path/to/my/s3/folder/" + target_dir,
           "recurse" : True
       }
    )
Run Code Online (Sandbox Code Playgroud)

那么,实现此目的的唯一方法是增加库以查找“target_dir”变量吗?我不认为 Airflow main 有上下文,因此我想做的事情可能是不可能的。任何气流专家,请权衡让我知道我的选择可能是什么。

kax*_*xil 4

op_kwargs是一个模板化字段。所以你可以使用xcom_push

def check_for_done_file(**kwargs):
    ...
    kwargs['ti'].xcom_push(value=y)
Run Code Online (Sandbox Code Playgroud)

并使用 jinja 模板op_kwargs

   move_data_to_in_progress_task = PythonOperator(
       task_id = 'move-from-incoming-to-in-progress',
       python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
       dag = my_dag,
       op_kwargs = {
           "source_bucket" : "source_bucket",
           "source_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
           "destination_bucket" : "destination_bucket",
           "destination_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
           "recurse" : True
       }
    )

Run Code Online (Sandbox Code Playgroud)

另外,添加provide_context=True到您的check_for_done_file_task任务中以将上下文字典传递给可调用对象。