如何解析气流模板中的json字符串

har*_*nsa 4 airflow

是否可以在气流模板中解析 JSON 字符串?

我有一个 HttpSensor,它通过 REST API 监视作业,但作业 ID 是在上游任务的响应中,该任务已xcom_push标记为True.

我想做类似以下的事情,但是,此代码给出了错误 jinja2.exceptions.UndefinedError: 'json' is undefined

t1 = SimpleHttpOperator(
    http_conn_id="s1",
    task_id="job",
    endpoint="some_url",
    method='POST',
    data=json.dumps({ "foo": "bar" }),
    xcom_push=True,
    dag=dag,
)

t2 = HttpSensor(
    http_conn_id="s1",
    task_id="finish_job",
    endpoint="job/{{ json.loads(ti.xcom_pull(\"job\")).jobId }}",
    response_check=lambda response: True if response.json().state == "complete" else False,
    poke_interval=5,
    dag=dag
)

t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

Dan*_*ang 12

您可以使用参数user_defined_filters向DAG 添加自定义 Jinja 过滤器以解析 json。

将在您的 jinja 模板中公开的过滤器字典。例如,传递 dict(hello=lambda name: 'Hello %s' % name)给此参数允许您{{ 'world' | hello }}在与此 DAG 相关的所有 jinja 模板中。

dag = DAG(
    ...
    user_defined_filters={'fromjson': lambda s: json.loads(s)},
)

t1 = SimpleHttpOperator(
    task_id='job',
    xcom_push=True,
    ...
)

t2 = HttpSensor(
    endpoint='job/{{ (ti.xcom_pull("job") | fromjson)["jobId"] }}',
    ...
)
Run Code Online (Sandbox Code Playgroud)

但是,编写自己的自定义JsonHttpOperator 插件(或向 中添加标志SimpleHttpOperator)在返回之前解析 JSON以便您可以直接{{ti.xcom_pull("job")["jobId"]在模板中引用可能更清晰。

class JsonHttpOperator(SimpleHttpOperator):

    def execute(self, context):
        text = super(JsonHttpOperator, self).execute(context)
        return json.loads(text)
Run Code Online (Sandbox Code Playgroud)


har*_*nsa 6

或者,也可以json通过这样做将模块添加到模板中,并且 json 将可用于模板内部。然而,像丹尼尔所说的那样创建一个插件可能是一个更好的主意。

dag = DAG(
    'dagname',
    default_args=default_args,
    schedule_interval="@once",
    user_defined_macros={
        'json': json
    }
)
Run Code Online (Sandbox Code Playgroud)

然后

finish_job = HttpSensor(
    task_id="finish_job",
    endpoint="kue/job/{{ json.loads(ti.xcom_pull('job'))['jobId'] }}",
    response_check=lambda response: True if response.json()['state'] == "complete" else False,
    poke_interval=5,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)