con*_*lee 5 airflow google-cloud-composer
我BigQueryOperator在 Google Cloud Composer 上的 Airflow DAG 中广泛使用。
对于较长的查询,最好将每个查询放在自己的.sql文件中,而不是用它来弄乱 DAG。Airflow 似乎支持所有 SQL 查询运算符,包括 BigQueryOperator,如您在文档中所见。
我的问题:在.sql模板文件中编写了我的 sql 语句后,如何将其添加到 Google Cloud Composer 并在 DAG 中引用它?
alv*_*ila 11
我找到了解决这个问题的理想方法。在 dag 声明中,您可以设置template_searchpathAirflow 查找 jinja 模板文件的默认路径。
为了使其在您的 Cloud Composer 实例中工作,您必须按以下方式进行设置
dag = DAG(
...
template_searchpath=["/home/airflow/gcs/plugins"],
)
Run Code Online (Sandbox Code Playgroud)
请注意,我在本示例中使用了插件文件夹。您可以使用数据文件夹或您想要在存储桶中使用的任何文件夹。
在谷歌搜索并找到这个相关问题之后。我找到了一种方法来完成这项工作(尽管这不是理想的解决方案,我们将看到)。这是一个包含三个部分的工作示例:
gcloud模板上传到正确位置所需的命令。(1) sql模板文件
这只是一个文件名以.sql扩展名结尾的文本文件。假设这个文件被调用my-templated-query.sql并包含:
SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')
Run Code Online (Sandbox Code Playgroud)
(2)引用 DAG 文件中的模板 要引用此模板,请创建如下操作符:
count_task = BigQueryOperator(
task_id='count_rows',
sql='/my-templated-query.sql')
Run Code Online (Sandbox Code Playgroud)
(3)将模板文件添加到Google Cloud Composer 原来,airflow 默认在dags 文件夹中查找模板文件。要将我们的模板文件上传到 dags 文件夹,我们运行
gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql
Run Code Online (Sandbox Code Playgroud)
您必须相应地替换环境名称、位置和源路径。
将所有这些模板上传到 dag 文件夹似乎不太正确。更好的 Airflow 实践是将您的模板放在它们自己的文件夹中,并在创建 DAG 时指定指向它的template_searchpath参数。但是,我不确定如何使用 Google Cloud Composer 执行此操作。
更新:我意识到可以将子文件夹放在 DAG 文件夹中,这对于组织大量 SQL 模板很有用。假设我DAG_FOLDER/dataset1/table1.sql 在 BigQueryOperator 中放置了一个 SQL 模板文件,然后可以使用sql=/dataset1/table1.sql. 如果您有一个包含大量文件的子文件夹和许多其他子文件夹,您还可以使用dag import上面显示的I 递归上传整个子文件夹——只需将其指向子文件夹。
| 归档时间: |
|
| 查看次数: |
5461 次 |
| 最近记录: |