使用UI进行Airflow s3连接

Nik*_*ddy 35 python-3.x airflow

我一直在尝试使用Airflow来安排DAG.其中一个DAG包括从s3存储桶加载数据的任务.

出于上述目的,我需要设置s3连接.但是气流提供的UI并不是那么直观(http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections).任何人都成功建立了s3连接,如果有的话,你们所遵循的最佳做法是什么?

谢谢.

小智 62

很难找到参考文献,但经过挖掘后我才能使它工作.

TLDR

使用以下属性创建新连接:

Conn Id: my_conn_S3

Conn类型: S3

额外:

{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
Run Code Online (Sandbox Code Playgroud)

长版本,设置UI连接:

  • 在Airflow UI上,转至管理>连接
  • 使用以下属性创建新连接:Conn Id:my_conn_S3,Conn类型:S3,额外:{"aws_access_key_id":"_ your_aws_access_key_id_","aws_secret_access_key":"_ your_aws_secret_access_key_"}
  • 将所有其他字段(主机,架构,登录)留空.

要使用此连接,您可以在下面找到简单的S3传感器测试.这个测试的想法是建立一个在S3(T1任务)中监视文件的传感器,一旦满足条件,它就会触发一个bash命令(T2任务).

测试

  • 在运行DAG之前,请确保您有一个名为"S3-Bucket-To-Watch"的S3存储桶.
  • 将以下s3_dag_test.py添加到airflow dags文件夹(〜/ airflow/dags)
  • 开始my_conn_S3.
  • 转到Airflow UI(http:// localhost:8383 /)
  • 开始S3.
  • 在主DAG视图上打开's3_dag_test'DAG.
  • 选择"s3_dag_test"以显示dag详细信息.
  • 在图表视图上,您​​应该能够看到它的当前状态.
  • 'check_s3_for_file_in_s3'任务应该是活动的并且正在运行.
  • 现在,将一个名为"file-to-watch-1"的文件添加到"S3-Bucket-To-Watch"中.
  • 第一项任务应该已经完成​​,第二项任务应该开始并完成.

dag定义中的schedule_interval设置为'@once',以方便调试.

要再次运行它,保留所有内容,删除存储桶中的文件,然后选择第一个任务(在图表视图中)并选择"清除"所有"过去","未来","上游","下游"再次尝试....活动.这应该再次启动DAG.

让我知道它是怎么回事.

s3_dag_test.py;

"""
S3 Sensor Connection Test
"""

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 11, 1),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')

t1 = BashOperator(
    task_id='bash_test',
    bash_command='echo "hello, it should work" > s3_conn_test.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='check_s3_for_file_in_s3',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='S3-Bucket-To-Watch',
    s3_conn_id='my_conn_S3',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)
Run Code Online (Sandbox Code Playgroud)
主要参考文献:

  • 谢谢这很有帮助.在版本1.8.1+中,导入已经改变,例如使用`from airflow.operators.bash_operator import BashOperator`和`from airflow.operators.sensors import s3KeySensor`我也试图在服务器上找到文件`s3_conn_test.txt`并且它不在那里.我检查了日志,看起来脚本在/ tmp /的某个子目录中运行,随后在任务完成时将其删除,因此最好写入airflow用户有权访问的显式路径. (2认同)
  • @Davos它是一个大写字母S而不是S3KeySensor的小写字母. (2认同)
  • 我希望安塞尔莫能编辑这个答案,因为这不再是正确的方法。这会以纯文本形式公开密钥/密码。请参阅下面@Ash 的回答 (2认同)

小智 10

另一个对我有用的选项是将访问密钥作为“登录”,将密钥作为“密码”:

Conn Id: <arbitrary_conn_id>
Conn Type: S3
Login: <aws_access_key>
Password: <aws_secret_key>
Run Code Online (Sandbox Code Playgroud)

将所有其他字段留空。


小智 9

如果您担心在UI中公开凭据,另一种方法是在UI中的Extra param中传递凭证文件位置.只有功能用户才具有该文件的读权限.它看起来像下面的东西

Extra:  {
    "profile": "<profile_name>", 
    "s3_config_file": "/home/<functional_user>/creds/s3_credentials", 
    "s3_config_format": "aws" }
Run Code Online (Sandbox Code Playgroud)

文件" /home/<functional_user>/creds/s3_credentials"有以下条目

[<profile_name>]
aws_access_key_id = <access_key_id>
aws_secret_access_key = <secret_key>
Run Code Online (Sandbox Code Playgroud)


小智 8

假设气流托管在EC2服务器上。

只需按照其他答案创建连接,但除了连接类型(应保留为S3)之外,配置中的所有内容均保留为空白

S3hook将默认为boto,这将默认为您在其上运行气流的EC2服务器的角色。假设此角色拥有S3的权限,则您的任务将能够访问存储桶。

这是比使用和存储凭据更安全的方法。


Ash*_*lor 7

几个版本之前,我们已将此添加到我们的文档中:

http://airflow.apache.org/docs/stable/howto/connection/aws.html

AWS 连接和 S3 连接之间没有区别。

此处接受的答案在 extra/JSON 中有密钥和秘密,虽然它仍然有效(从 1.10.10 开始),但不再推荐,因为它在 UI 中以纯文本形式显示秘密。


小智 5

对于新版本,请更改上述示例中的 python 代码。

s3_conn_id='my_conn_S3'
Run Code Online (Sandbox Code Playgroud)

aws_conn_id='my_conn_s3'
Run Code Online (Sandbox Code Playgroud)