Airflow - 如何处理异步 API 调用?

ada*_*n11 5 airflow

我正在尝试找出如何最好地解决以下问题。本质上,我有一个外部 API 服务,我向其发送请求并获取结果。

POST = 发送请求,您收到的响应是一个 URL,您可以使用该 URL 进行 GET 请求来检索结果。

GET = 轮询 POST 请求返回的 URL,直到获得成功结果。

在气流中解决这个问题的最佳方法是什么?我的想法是本质上有两个任务并行运行。

  1. 一种发送 POST 请求,然后将响应 URL 保存到 XCOM。
  2. 另一个将在 while 循环中持续运行,从 XCOM 存储中读取新的 URL 响应并获取响应。一旦从该 URL 成功检索到结果,它就会从 XCOM 存储中删除。

您认为这是正确的处理方式吗?或者我应该使用 python 中的 asyncio 库吗?

非常感谢任何帮助

谢谢,

Nic*_*coE 3

您可以使用 Airflow 实现您所描述的内容SimpleHttpOperatorHttpSensor无需安装任何额外的软件包)。

考虑这个使用http_default连接到http bin 的示例。

执行POST请求的任务:

task_post_op = SimpleHttpOperator(
    task_id='post_op',
    # http_conn_id='your_conn_id',
    endpoint='post',
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    response_check=lambda response: response.json()['json']['priority'] == 5,
    response_filter=lambda response: 'get', # e.g  lambda response: json.loads(response.text)
    dag=dag, 
)

Run Code Online (Sandbox Code Playgroud)

通过提供,response_filter您可以操纵响应结果,这将是推送到 的值XCom。在您的情况下,您应该在下一个任务中返回要轮询的端点。

response_filter:允许您操作响应文本的函数。例如,response_filter=lambda 响应:json.loads(response.text)。可调用函数将响应对象作为第一个位置参数,并且可以选择上下文字典中可用的任意数量的关键字参数。:type response_filter: lambda 或定义的函数。

请注意,response_check参数是可选的。

执行 GET 请求的任务:

使用HttpSensor进行探测,直到response_check可调用的计算结果为 true。

task_http_sensor_check = HttpSensor(
    task_id='http_sensor_check',
    # http_conn_id='your_conn_id',
    endpoint=task_post_op.output, 
    request_params={},
    response_check=lambda response: "httpbin" in response.text,
    poke_interval=5,
    dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

作为参数,我们使用XComArgendpoint传递从上一个任务中提取的 XCom 值。用于定义作业在每次尝试之间应等待的时间(以秒为单位)。poke_interval

请记住创建您自己的连接,定义基本 URL、端口等。

让我知道这是否对你有用!