sag*_*sag 1 airflow google-cloud-composer
我需要将文件从 FTP 服务器复制到特定的 GCS 位置。我正在使用 ftp_hook 将文件下载到 /data 文件夹中。我需要将此文件移动到不同的 GCS 存储桶,而不是 Composer GCS 存储桶。
我正在尝试使用GoogleCloudStorageToGoogleCloudStorageOperator运算符将文件从 Composer 存储桶复制到所需的存储桶。为此,我需要在 Airflow 任务中读取 Composer 存储桶。我不想将其添加为自定义变量,因为我的作曲家本身是动态创建的。那么如何获取我的数据文件夹所在的composer存储桶的信息呢?
更新:
我刚刚发现(也许这是新事物)您可以使用存储桶访问环境变量。这是在 Composer 中自动定义的。
COMPOSER_BUCKET = os.environ["GCS_BUCKET"]
Run Code Online (Sandbox Code Playgroud)
原来的:
我不是 100% 确定您是否想动态执行此操作(即相同的 DAG 无需任何修改即可在其他 Composer 环境中工作),无论哪种方式,这就是我的想法:
(不是动态的)你可以在环境中点击查看Composer使用的bucket,它应该在“DAGs文件夹”下(实际上是DAGs所在的文件夹,只需取出/dags)
(动态)由于您想要将文件从 Composer 复制到 GCS,因此您可以使用 FileToGoogleCloudStorageOperator并使用映射到Composer Bucket 的文件。请注意,本地存储和 Composer 存储桶是相互映射的,因此访问路径home/airflow/gcs/data/file1与gs://<bucket>/data/file1.
(半动态)您可以使用Composer API获取环境详细信息并解析存储桶。当然,你需要事先知道名称、地点和项目。
在这三个中,我想说使用FileToGoogleCloudStorageOperator的那个是最干净、最简单的。
| 归档时间: |
|
| 查看次数: |
3853 次 |
| 最近记录: |