在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.
dag会触发工作吗?如果是这样,请你举个例子.
我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)
我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTaskSensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)
我正在尝试找出如何最好地解决以下问题。本质上,我有一个外部 API 服务,我向其发送请求并获取结果。
POST = 发送请求,您收到的响应是一个 URL,您可以使用该 URL 进行 GET 请求来检索结果。
GET = 轮询 POST 请求返回的 URL,直到获得成功结果。
在气流中解决这个问题的最佳方法是什么?我的想法是本质上有两个任务并行运行。
您认为这是正确的处理方式吗?或者我应该使用 python 中的 asyncio 库吗?
非常感谢任何帮助
谢谢,