Using python string substitution and xcom_pull in Airflow HttpSensor operator

ban*_*ing 4 airflow apache-airflow-xcom

I have a use-case where Im inside a for loop and need to populate fields in an HttpSensor task dynamically

I tried using this syntax:

Method 1 FAILED:

s = 'sensor_task_sd_{0}'.format(d)
            sensor_task_sd = HttpSensor(
                task_id=s,
                http_conn_id='ss_api',
                endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t),
                request_params={'X-Requested-By': 'abc_123'},
                response_check=lambda response: True if "FINISHED" in response.text else False,
                poke_interval=10,
                soft_fail=True,
                timeout=600,
                dag=dag_subdag,
                )
Run Code Online (Sandbox Code Playgroud)

but it fails because in this line:

endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t)
Run Code Online (Sandbox Code Playgroud)

I cannot make the python string substitution with .format(t) work.

Instead if I hard code some value the above code works... for example, the code below works fine:

Method 2 SUCCESS:

s = 'sensor_task_sd_{0}'.format(d)
sensor_task_sd = HttpSensor(
               task_id=s,
               http_conn_id='ss_api',
        endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids='start_pipeline_sd_campaignhistory')}}/status?rev=0",
        request_params={'X-Requested-By': 'abc_123'},
               response_check=lambda response: True if "FINISHED" in response.text else False,
               poke_interval=10,
               soft_fail=True,
               timeout=600,
               dag=dag_subdag)
Run Code Online (Sandbox Code Playgroud)

I just donot understand this bit..... Ive tried every combination of tricks to make it work, it just doesnt take the string interpolations which Iam using to keep the code dynamic.

so my question is very simple:

How can I make the HttpSensor operator dynamic? I donot want to hard code my function values within the endpoint string(Method 2 style), I would like to use values that are set at run time (Method 1 style).

How can I make Method 1 work?

Lia*_*rke 8

所以 Airflow 使用 Jinja 来模板化它的字符串,当你混合 Jinja 模板和 Python 格式时,你需要“转义”Jinja 需要的大括号,这样 Python 格式就不会消耗它们。您可以通过将不用于.format()调用的每个大括号加倍来实现。

这应该会给你你需要的结果。

endpoint = "/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={}) }}}}/status?rev=0".format(t)
Run Code Online (Sandbox Code Playgroud)

顺便说一句,根据我使用 f-strings (Python 3.6+) 或命名格式参数的经验,如果您能够在 Airflow 脚本中混合两者时,确实可以帮助代码清晰。但是您仍然需要“转义”大括号。

f 字符串:

endpoint = f"/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={t})}} }}/status?rev=0"
Run Code Online (Sandbox Code Playgroud)

命名格式参数:

endpoint = "/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={task_id}) }}}}/status?rev=0".format(task_id=t)
Run Code Online (Sandbox Code Playgroud)

希望有帮助:)