Nik*_*ddy 35 python-3.x airflow
我一直在尝试使用Airflow来安排DAG.其中一个DAG包括从s3存储桶加载数据的任务.
出于上述目的,我需要设置s3连接.但是气流提供的UI并不是那么直观(http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections).任何人都成功建立了s3连接,如果有的话,你们所遵循的最佳做法是什么?
谢谢.
小智 62
很难找到参考文献,但经过挖掘后我才能使它工作.
使用以下属性创建新连接:
Conn Id: my_conn_S3
Conn类型: S3
额外:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
Run Code Online (Sandbox Code Playgroud)
要使用此连接,您可以在下面找到简单的S3传感器测试.这个测试的想法是建立一个在S3(T1任务)中监视文件的传感器,一旦满足条件,它就会触发一个bash命令(T2任务).
my_conn_S3.S3.dag定义中的schedule_interval设置为'@once',以方便调试.
要再次运行它,保留所有内容,删除存储桶中的文件,然后选择第一个任务(在图表视图中)并选择"清除"所有"过去","未来","上游","下游"再次尝试....活动.这应该再次启动DAG.
让我知道它是怎么回事.
"""
S3 Sensor Connection Test
"""
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 1),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')
t1 = BashOperator(
task_id='bash_test',
bash_command='echo "hello, it should work" > s3_conn_test.txt',
dag=dag)
sensor = S3KeySensor(
task_id='check_s3_for_file_in_s3',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='S3-Bucket-To-Watch',
s3_conn_id='my_conn_S3',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
Run Code Online (Sandbox Code Playgroud)
小智 10
另一个对我有用的选项是将访问密钥作为“登录”,将密钥作为“密码”:
Conn Id: <arbitrary_conn_id>
Conn Type: S3
Login: <aws_access_key>
Password: <aws_secret_key>
Run Code Online (Sandbox Code Playgroud)
将所有其他字段留空。
小智 9
如果您担心在UI中公开凭据,另一种方法是在UI中的Extra param中传递凭证文件位置.只有功能用户才具有该文件的读权限.它看起来像下面的东西
Extra: {
"profile": "<profile_name>",
"s3_config_file": "/home/<functional_user>/creds/s3_credentials",
"s3_config_format": "aws" }
Run Code Online (Sandbox Code Playgroud)
文件" /home/<functional_user>/creds/s3_credentials"有以下条目
[<profile_name>]
aws_access_key_id = <access_key_id>
aws_secret_access_key = <secret_key>
Run Code Online (Sandbox Code Playgroud)
小智 8
假设气流托管在EC2服务器上。
只需按照其他答案创建连接,但除了连接类型(应保留为S3)之外,配置中的所有内容均保留为空白
S3hook将默认为boto,这将默认为您在其上运行气流的EC2服务器的角色。假设此角色拥有S3的权限,则您的任务将能够访问存储桶。
这是比使用和存储凭据更安全的方法。
几个版本之前,我们已将此添加到我们的文档中:
http://airflow.apache.org/docs/stable/howto/connection/aws.html
AWS 连接和 S3 连接之间没有区别。
此处接受的答案在 extra/JSON 中有密钥和秘密,虽然它仍然有效(从 1.10.10 开始),但不再推荐,因为它在 UI 中以纯文本形式显示秘密。
小智 5
对于新版本,请更改上述示例中的 python 代码。
s3_conn_id='my_conn_S3'
Run Code Online (Sandbox Code Playgroud)
到
aws_conn_id='my_conn_s3'
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
20066 次 |
| 最近记录: |