我正在尝试找出如何最好地解决以下问题。本质上,我有一个外部 API 服务,我向其发送请求并获取结果。
POST = 发送请求,您收到的响应是一个 URL,您可以使用该 URL 进行 GET 请求来检索结果。
GET = 轮询 POST 请求返回的 URL,直到获得成功结果。
在气流中解决这个问题的最佳方法是什么?我的想法是本质上有两个任务并行运行。
您认为这是正确的处理方式吗?或者我应该使用 python 中的 asyncio 库吗?
非常感谢任何帮助
谢谢,
您可以使用 Airflow 实现您所描述的内容SimpleHttpOperator(HttpSensor无需安装任何额外的软件包)。
考虑这个使用http_default连接到http bin 的示例。
执行POST请求的任务:
task_post_op = SimpleHttpOperator(
task_id='post_op',
# http_conn_id='your_conn_id',
endpoint='post',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()['json']['priority'] == 5,
response_filter=lambda response: 'get', # e.g lambda response: json.loads(response.text)
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
通过提供,response_filter您可以操纵响应结果,这将是推送到 的值XCom。在您的情况下,您应该在下一个任务中返回要轮询的端点。
response_filter:允许您操作响应文本的函数。例如,response_filter=lambda 响应:json.loads(response.text)。可调用函数将响应对象作为第一个位置参数,并且可以选择上下文字典中可用的任意数量的关键字参数。:type response_filter: lambda 或定义的函数。
请注意,response_check参数是可选的。
执行 GET 请求的任务:
使用HttpSensor进行探测,直到response_check可调用的计算结果为 true。
task_http_sensor_check = HttpSensor(
task_id='http_sensor_check',
# http_conn_id='your_conn_id',
endpoint=task_post_op.output,
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
作为参数,我们使用XComArgendpoint传递从上一个任务中提取的 XCom 值。用于定义作业在每次尝试之间应等待的时间(以秒为单位)。poke_interval
请记住创建您自己的连接,定义基本 URL、端口等。
让我知道这是否对你有用!
| 归档时间: |
|
| 查看次数: |
3027 次 |
| 最近记录: |