小编Spa*_*Bot的帖子

气流的“并行”和“dag_concurrency”有什么区别

我不明白之间的差别dag_concurrencyparallelism。文档和这里的一些相关帖子在某种程度上与我的发现相矛盾。

我之前的理解是,该parallelism参数允许您设置气流中可能的全局(跨所有 DAG)TaskRundag_concurrency的最大数量,并表示单个 Dag 可能的 TaskRun 的最大数量。

所以我将 设置parallelism为 8 和dag_concurrency4 并运行一个 Dag。我发现它一次运行 8 个 TI,但我期望它一次运行 4 个。

  1. 这怎么可能?

  2. 此外,如果有帮助,我已将这些任务的池大小设置为 10 左右。但这应该无关紧要,因为“配置”参数的优先级高于池的优先级,对吗?

python airflow

9
推荐指数
2
解决办法
3589
查看次数

考虑到 RAM 和 CPU 的限制,如何使用 Airflow 主动控制 DAG

通过尝试大量示例,我对气流的编程功能非常熟悉。阻止我进一步挖掘的是它如何在不超载 CPU 或 RAM 的情况下执行其工作,是否有一种方法可以控制负载,使其不会过载资源耗尽

我知道一种减少负载的方法,当调度程序执行“更频繁地调度和挑选文件”的工作时,通过将以下字段 min_file_process_interval 和 Scheduler_heartbeat_sec 的值更改为一分钟左右的间隔。虽然它减少了持续的 CPU 上升,但是当间隔过去时(即一分钟后),它会突然恢复到像启动期间一样占用约 95% 的 CPU。你如何减少它呢?至少不会消耗超过 70% 的 CPU ?

编辑:

另外,当scheduler_heartbeat间隔过去时,我看到我的所有python脚本再次执行......这是它的工作方式吗?我认为它会在间隔之后拾取新的 DAG(如果有),否则不会执行任何操作。

python load airflow

5
推荐指数
1
解决办法
1574
查看次数

我不能通过 BashOperator xcom_push 参数

我是 Airflow 的 xcom 功能的新手。我用 PythonOperator 进行了尝试,它工作正常(即,我可以将值从上下文中推入和拉出),但是当我在 BashOperator 上尝试时,它不起作用。但是,我只能通过在任务创建期间添加 xcom_push=True 属性来提取最终的 stdout 语句。这是一回事。2)但我也希望像我们在 PythonOp 中那样根据它们的键(到 BashOp 和从 BashOp)推送和拉取值。这真的很有帮助,因为我需要将大量变量从一个脚本传递到其他。

airflow apache-airflow-xcom

1
推荐指数
1
解决办法
5739
查看次数

标签 统计

airflow ×3

python ×2

apache-airflow-xcom ×1

load ×1