相关疑难解决方法(0)

使用UI进行Airflow s3连接

我一直在尝试使用Airflow来安排DAG.其中一个DAG包括从s3存储桶加载数据的任务.

出于上述目的,我需要设置s3连接.但是气流提供的UI并不是那么直观(http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections).任何人都成功建立了s3连接,如果有的话,你们所遵循的最佳做法是什么?

谢谢.

python-3.x airflow

35
推荐指数
6
解决办法
2万
查看次数

Airflow S3KeySensor - 如何让它继续运行

这个Stackoverflow帖子的帮助下,我刚刚创建了一个程序(帖子中显示的那个),当一个文件放在S3存储桶中时,我的一个正在运行的DAG中的任务被触发,然后我使用BashOperator执行一些工作.一旦完成,虽然DAG不再处于运行状态,而是进入成功状态,如果我想让它拿起另一个文件,我需要清除所有"过去","未来","上游",下游'活动.我想制作这个程序,以便它始终在运行,并且只要在S3存储桶中放置一个新文件,程序就会启动任务.

我可以继续使用S3KeySenor执行此操作,还是需要设置一种设置外部触发器来运行DAG的方法?截至目前,我的S3KeySensor如果只运行一次就毫无意义.

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 29),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')

# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
    task_id='create_emr_cluster_1', …
Run Code Online (Sandbox Code Playgroud)

boto3 airflow airflow-scheduler

14
推荐指数
1
解决办法
5258
查看次数

标签 统计

airflow ×2

airflow-scheduler ×1

boto3 ×1

python-3.x ×1