相关疑难解决方法(0)

在Airflow中创建动态工作流的正确方法

问题

在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.

dag会触发工作吗?如果是这样,请你举个例子.

我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)

想法#1

我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTask​​Sensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)

编辑1: …

python workflow airflow

66
推荐指数
8
解决办法
2万
查看次数

Airflow - 如何处理异步 API 调用?

我正在尝试找出如何最好地解决以下问题。本质上,我有一个外部 API 服务,我向其发送请求并获取结果。

POST = 发送请求,您收到的响应是一个 URL,您可以使用该 URL 进行 GET 请求来检索结果。

GET = 轮询 POST 请求返回的 URL,直到获得成功结果。

在气流中解决这个问题的最佳方法是什么?我的想法是本质上有两个任务并行运行。

  1. 一种发送 POST 请求,然后将响应 URL 保存到 XCOM。
  2. 另一个将在 while 循环中持续运行,从 XCOM 存储中读取新的 URL 响应并获取响应。一旦从该 URL 成功检索到结果,它就会从 XCOM 存储中删除。

您认为这是正确的处理方式吗?或者我应该使用 python 中的 asyncio 库吗?

非常感谢任何帮助

谢谢,

airflow

5
推荐指数
1
解决办法
3027
查看次数

标签 统计

airflow ×2

python ×1

workflow ×1