我有一种情况,我需要在 S3 中找到一个特定的文件夹才能在 Airflow 脚本中传递给 PythonOperator。我正在使用另一个找到正确目录的 PythonOperator 来执行此操作。我可以成功地使用 xcom.push() 或 Variable.set() 并在PythonOperator 中读回它。问题是,我需要将此变量传递给使用 Python 库中代码的单独 PythonOperator。因此,我需要在 Airflow 脚本的主要部分中使用 Variable.get() 或 xcom.pull() 这个变量。我已经搜索了很多,似乎无法弄清楚这是否可能。下面是一些代码供参考:
def check_for_done_file(**kwargs):
### This function does a bunch of stuff to find the correct S3 path to
### populate target_dir, this has been verified and works
Variable.set("target_dir", done_file_list.pop())
test = Variable.get("target_dir")
print("TEST: ", test)
#### END OF METHOD, BEGIN MAIN
with my_dag:
### CALLING METHOD FROM MAIN, POPULATING VARIABLE
check_for_done_file_task = PythonOperator(
task_id = 'check_for_done_file',
python_callable = check_for_done_file,
dag = my_dag,
op_kwargs = {
"source_bucket" : "my_source_bucket",
"source_path" : "path/to/the/s3/folder/I/need"
}
)
target_dir = Variable.get("target_dir") # I NEED THIS VAR HERE.
move_data_to_in_progress_task = PythonOperator(
task_id = 'move-from-incoming-to-in-progress',
python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
dag = my_dag,
op_kwargs = {
"source_bucket" : "source_bucket",
"source_path" : "path/to/my/s3/folder/" + target_dir,
"destination_bucket" : "destination_bucket",
"destination_path" : "path/to/my/s3/folder/" + target_dir,
"recurse" : True
}
)
Run Code Online (Sandbox Code Playgroud)
那么,实现此目的的唯一方法是增加库以查找“target_dir”变量吗?我不认为 Airflow main 有上下文,因此我想做的事情可能是不可能的。任何气流专家,请权衡让我知道我的选择可能是什么。
op_kwargs是一个模板化字段。所以你可以使用xcom_push:
def check_for_done_file(**kwargs):
...
kwargs['ti'].xcom_push(value=y)
Run Code Online (Sandbox Code Playgroud)
并使用 jinja 模板op_kwargs:
move_data_to_in_progress_task = PythonOperator(
task_id = 'move-from-incoming-to-in-progress',
python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
dag = my_dag,
op_kwargs = {
"source_bucket" : "source_bucket",
"source_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
"destination_bucket" : "destination_bucket",
"destination_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
"recurse" : True
}
)
Run Code Online (Sandbox Code Playgroud)
另外,添加provide_context=True到您的check_for_done_file_task任务中以将上下文字典传递给可调用对象。