将字符串列表作为 Airflow 中依赖任务的参数传递

Gui*_*lle 5 python airflow

我正在尝试通过XCom将字符串列表从一个任务传递到另一个任务,但我似乎无法将推送的列表解释为列表。

例如,当我在某个blah在 a 中运行的函数中执行此操作时ShortCircuitOperator

paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)
Run Code Online (Sandbox Code Playgroud)

然后我想使用这样的列表作为运算符的参数。例如,

run_task_after_blah = AfterBlahOperator(
    task_id='run-task-after-blah',
    ...,
    input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
    ...,
)
Run Code Online (Sandbox Code Playgroud)

我希望input_paths等于paths但不是因为渲染首先发生然后分配,并且模板渲染在某种程度上将xcom_pull返回转换为字符串化列表(此后我的AfterBlahOperator插入将其分配为 JSON 中元素的值。

我尝试将其连接paths成一个由某个分隔符分隔的字符串,然后将其推送到 XCom,然后在从 XCom 中拉出时将其拆分回来,但是由于 XCom 首先呈现,我得到了在模板中调用函数时的字符串化列表split或者paths如果split函数应用于参数的原始连接字符串(如"{{ ti.xcom_pull(task_ids='find-paths') }}".split(';').

当提取的值可以进一步处理时,XCom 似乎非常适合作为任务参数的单个值或多个值,但不能将 multiple_values 转换为“一个”作为任务的参数。

有没有办法做到这一点而不必编写一个额外的函数来精确返回这样的字符串列表?或者也许我滥用 XCom 太多了,但是 Airflow 中有许多操作符将元素列表作为参数(例如,通常是多个文件的完整路径,这些文件是先前任务的结果,因此事先不知道)。

Dan*_*ang 8

Jinja 渲染字符串,所以如果你通过模板获取 XCom,它总是一个字符串。相反,您将需要获取您有权访问该TaskInstance对象的 XCom 。像这样的东西:

class AfterBlahOperator(BaseOperator):

    def __init__(self, ..., input_task_id, *args, **kwargs):
        ...
        self.input_task_id = input_task_id
        super(AfterBlahOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
        for path in input_paths:
            ...
Run Code Online (Sandbox Code Playgroud)

这类似于您在 a 中获取它的方式PythonOperatorXCom 文档提供了一个示例。

请注意,input_paths当它可以在 DAG 中进行硬编码时,您仍然可以支持单独的参数,您只需要额外检查以查看从哪个参数读取值。