som*_*ode 1 amazon-ecs airflow
我在代码中定义了以下 DAG:
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from airflow.contrib.operators.ecs_operator import ECSOperator
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2018, 9, 24, 10, 00, 00)
}
dag = DAG(
'data-push',
default_args=default_args,
schedule_interval='0 0 * * 1,4',
)
colors = ['blue', 'red', 'yellow']
for color in colors:
ECSOperator(dag=dag,
task_id='data-push-for-%s' % (color),
task_definition= 'generic-push-colors',
cluster= 'MY_ECS_CLUSTER_ARN',
launch_type= 'FARGATE',
overrides={
'containerOverrides': [
{
'name': 'push-colors-container',
'command': [color]
}
]
},
region_name='us-east-1',
network_configuration={
'awsvpcConfiguration': {
'securityGroups': ['MY_SG'],
'subnets': ['MY_SUBNET'],
'assignPublicIp': "ENABLED"
}
},
)
Run Code Online (Sandbox Code Playgroud)
这应该创建一个包含 3 个任务的 DAG,每个任务对应我的颜色列表中的每种颜色。
这看起来不错,当我跑步时:
airflow list_dags
Run Code Online (Sandbox Code Playgroud)
我看到我的达格列出了:
data-push
Run Code Online (Sandbox Code Playgroud)
当我跑步时:
airflow list_tasks data-push
Run Code Online (Sandbox Code Playgroud)
我看到我的三项任务按其应有的方式出现:
data-push-for-blue
data-push-for-red
data-push-for-yellow
Run Code Online (Sandbox Code Playgroud)
然后,我通过在终端中输入以下内容来测试运行我的任务之一:
airflow run data-push data-push-for-blue 2017-1-23
Run Code Online (Sandbox Code Playgroud)
这会运行该任务,我可以看到该任务出现在 aws 仪表板上的 ECS 集群中,因此我知道该任务在我的 ECS 集群上运行,并且数据已成功推送,一切都很好。
现在,当我尝试从 Airflow UI 运行 DAG 数据推送时,我遇到了问题。
我跑:
airflow initdb
Run Code Online (Sandbox Code Playgroud)
其次是:
airflow webserver
Run Code Online (Sandbox Code Playgroud)
现在进入 localhost:8080 的气流 UI。
我在 dag 列表中看到 dag 数据推送,单击它,然后为了测试运行整个 dag,我单击“触发 DAG”按钮。我不添加任何配置 json,然后单击“触发器”。然后,DAG 的树视图在树结构的右侧显示一个绿色圆圈,似乎表明 DAG 正在“运行”。但绿色圆圈会在那里停留很长时间,当我手动检查 ECS 仪表板时,我发现没有实际运行的任务,因此从 Airflow UI 触发 DAG 后什么也没有发生,尽管当我从 CLI 手动运行这些任务时任务正在运行。
如果这很重要的话,我正在使用 SequentialExecutor 。
关于为什么在从 CLI 运行单个任务时触发 DAG 没有任何作用,我的两个主要理论是,也许我在定义 dag 的 python 代码中遗漏了一些东西(也许是因为我没有为任务指定任何依赖项? )或者我没有运行气流调度程序,但如果我从气流 UI 手动触发 DAGS,我不明白为什么调度程序需要运行,也不明白为什么它不会向我显示错误,指出这是一个问题。
有任何想法吗?
| 归档时间: |
|
| 查看次数: |
1924 次 |
| 最近记录: |