Raf*_*fay 6 java multithreading etl airflow airflow-scheduler
我应该能够创建一个单节点 Group1 来满足节流的需求,并且还具有
如何在 Airflow 中为 Spring Boot Java 应用程序实现这样的层次结构?是否可以使用 Airflow 构造来设计这种 DAG,并动态地告诉 Java 应用程序一次可以提取多少表。例如,如果除 Worker1 之外的所有 worker 都完成了,那么 Worker1 现在可以使用所有 5 个可用线程,而其他所有线程都将继续执行 step2。
这些约束不能建模为有向无环图,因此不能完全按照描述在气流中实现。但是,它们可以建模为队列,因此可以使用作业队列框架来实现。这是您的两个选择:
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
# Executors that inherit from BaseExecutor take a parallelism parameter
from wherever import SomeExecutor, SomeOperator
# Table load jobs are done with parallelism 5
load_tables = SubDagOperator(subdag=DAG("load_tables"), executor=SomeExecutor(parallelism=5))
# Each table load must be it's own job, or must be split into sets of tables of predetermined size, such that num_tables_per_job * parallelism = 5
for table in tables:
load_table = SomeOperator(task_id=f"load_table_{table}", dag=load_tables)
# Jobs done afterwards are done with higher parallelism
afterwards = SubDagOperator(
subdag=DAG("afterwards"), executor=SomeExecutor(parallelism=high_parallelism)
)
for job in jobs:
afterward_job = SomeOperator(task_id=f"job_{job}", dag=afterwards)
# After _all_ table load jobs are complete, start the jobs that should be done afterwards
load_tables > afterwards
Run Code Online (Sandbox Code Playgroud)
这里次优的方面是,对于 DAG 的前半部分,集群将未被充分利用higher_parallelism - 5。
# This is pseudocode, but could be easily adapted to a framework like Celery
# You need two queues
# The table load queue should be initialized with the job items
table_load_queue = Queue(initialize_with_tables)
# The queue for jobs to do afterwards starts empty
afterwards_queue = Queue()
def worker():
# Work while there's at least one item in either queue
while not table_load_queue.empty() or not afterwards_queue.empty():
working_on_table_load = [worker.is_working_table_load for worker in scheduler.active()]
# Work table loads if we haven't reached capacity, otherwise work the jobs afterwards
if sum(working_on_table_load) < 5:
is_working_table_load = True
task = table_load_queue.dequeue()
else
is_working_table_load = False
task = afterwards_queue.dequeue()
if task:
after = work(task)
if is_working_table_load:
# After working a table load, create the job to work afterwards
afterwards_queue.enqueue(after)
# Use all the parallelism available
scheduler.start(worker, num_workers=high_parallelism)
Run Code Online (Sandbox Code Playgroud)
使用这种方法,集群将不会得到充分利用。
| 归档时间: |
|
| 查看次数: |
326 次 |
| 最近记录: |