标签: google-cloud-composer

如何使用Cloud Composer下载和访问文件?

我有一些与文件相关的用例,我不确定如何使用Cloud Composer最好地完成.我该如何最好地完成这些?

1)我需要使用私钥(.pem)文件来访问SFTP服务器.该文件应存放在何处以及如何访问?在本地Airflow中,我将文件放在/ dags /所在目录下的文件夹/ keys /中.

2)我需要将文件从SFTP服务器移动到云存储.使用Airflow,我将这些从SFTP服务器下载到Airflow工作器实例上的特定位置,然后从那里上传.我可以使用Composer做类似的事情,还是因为我无法访问文件系统而有解决方法?

google-cloud-platform google-cloud-composer

2
推荐指数
1
解决办法
2051
查看次数

如何读取气流云作曲家存储桶中的文件?

要将 bigquery 查询与实际代码分开,我想将 sql 存储在一个单独的文件中,然后从 python 代码中读取它。我试图将文件添加到与 DAG 相同的存储桶中,也添加到子文件夹中,但是当气流使用 sql 文件运行我的 python 脚本时,我似乎无法读取该文件。

我想要的是这个:

gs://my-bucket/dags -> store dags
gs://my-bucket/dags/sql -> store sql files
Run Code Online (Sandbox Code Playgroud)

sql 文件可能是我需要先阅读以注入 jinja 模板不支持的内容的文件。

我可以做以上吗?

google-cloud-composer

2
推荐指数
1
解决办法
3837
查看次数

使用 gdrive 范围添加 BigQuery 连接?

我有一个外部表格表,我想通过 Airflow 中的 BigQueryOperator 进行查询。

我更愿意使用 Cloud Composer 服务帐户。

我通过 Airflow UI 使用以下参数创建了一个新连接:

Conn Id: bigquery_with_gdrive_scope
Conn Type: google_cloud_platform
Project Id: <my project id>
Keyfile path: <none>
Keyfile JSON: <none>
Scopes: https://www.googleapis.com/auth/bigquery,https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/drive
Run Code Online (Sandbox Code Playgroud)

在我的 DAG 中,我使用: BigQueryOperator(..., bigquery_conn_id='bigquery_with_gdrive_scope')

日志报告: Access Denied: BigQuery BigQuery: No OAuth token with Google Drive scope was found.

任务属性显示: bigquery_conn_id bigquery_with_gdrive_scope

就好像bigquery_conn_id参数被忽略了一样。

google-cloud-composer

2
推荐指数
1
解决办法
1504
查看次数

如何使用 Cloud Composer/Apache Airflow 运行带有设置文件的 Dataflow 管道?

我有一个工作数据流管道,第一次运行setup.py以安装一些本地帮助程序模块。我现在想使用 Cloud Composer/Apache Airflow 来调度管道。我已经创建了 DAG 文件,并将其与管道项目一起放置在指定的 Google Storage DAG 文件夹中。文件夹结构如下所示:

{Composer-Bucket}/
    dags/
       --DAG.py
       Pipeline-Project/
           --Pipeline.py
           --setup.py
           Module1/
              --__init__.py
           Module2/
              --__init__.py
           Module3/
              --__init__.py
Run Code Online (Sandbox Code Playgroud)

我的 DAG 中指定 setup.py 文件的部分如下所示:

resumeparserop = dataflow_operator.DataFlowPythonOperator(
    task_id="resumeparsertask",
    py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
    dataflow_default_options={
        "project": {PROJECT-NAME},    
        "setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})
Run Code Online (Sandbox Code Playgroud)

但是,当我查看 Airflow Web UI 中的日志时,出现错误:

RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.
Run Code Online (Sandbox Code Playgroud)

我不确定为什么找不到安装文件。如何使用设置文件/模块运行我的数据流管道?

python-2.7 google-cloud-dataflow airflow google-cloud-composer

2
推荐指数
1
解决办法
2180
查看次数

如何使用 Google Cloud Composer CLI 设置 AWS 凭证?

我想从我的云编写器管理的气流系统访问 AWS 服务。我不想从气流 UI 中进行设置——我想使用充当 CLI 的 Google Cloud SDK 来完成此操作。

我唯一的选择是使用 CLIenv-variables选项吗?如果是这样,我只需设置AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYenv 变量就足够了吗?或者我是否需要设置大量特定于气流的连接变量,例如 s3 AIRFLOW_CONN_S3_DEFAULT

airflow google-cloud-composer

2
推荐指数
1
解决办法
1634
查看次数

创建新的云作曲家环境时,是否可以将存储桶设置为预先存在的存储桶?

因此,我已经为此创建了一个空存储桶,并且我不希望 Composer 为 dags 创建自己的存储桶 - 我想使用已经创建的存储桶。

仅仅创建一个随机存储桶然后去是不理想的

gcloud composer environments run test-environment  --location europe-west1 variables -- --set gcs_bucket gs://my-bucket
Run Code Online (Sandbox Code Playgroud)

我已经深入研究了文档,但似乎您不能每次都创建一个全新的存储桶?

gcloud google-cloud-composer

2
推荐指数
1
解决办法
1548
查看次数

Google Cloud Composer BigQuery Operator - 获取作业 API HTTPError 404

我正在尝试在 GCC 上运行 BigQueryOperator。我已经成功运行 BigQueryCreateEmptyTableOperator 和 BigQueryTableDeleteOperator。

这是我的 dag 代码:

import datetime
import os
import logging


from airflow import configuration
from airflow import models
from airflow import DAG
from airflow.operators import email_operator
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_check_operator
from airflow.utils import trigger_rule
from contextlib import suppress
import json
from airflow.operators import python_operator

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in …
Run Code Online (Sandbox Code Playgroud)

python google-api google-bigquery airflow google-cloud-composer

2
推荐指数
1
解决办法
5007
查看次数

Google Composer - 如何在环境中安装 Microsoft SQL Server ODBC 驱动程序

我是 GCP 和 Airflow 的新手,正在尝试通过 python 3 通过简单的 PYODBC 连接运行我的 python 管道。但是,我相信我已经找到了需要在机器上安装的内容 [Microsoft doc] https://docs.microsoft .com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-2017,但我不知道该去哪里GCP 来运行这些命令。我已经钻了几个深坑寻找答案,但不知道如何解决问题

这是我上传 DAG 时不断看到的错误:

气流错误

这是 PYODBC 连接:

pyodbc.connect('DRIVER={Microsoft SQL Server};SERVER=servername;DATABASE=dbname;UID=username;PWD=password')
Run Code Online (Sandbox Code Playgroud)

当我在环境中打开我的 gcloud shell 并运行 Microsoft 下载时,它只是中止,当我下载 SDK 并从本地下载连接到项目时,它会自动中止或无法识别来自 Microsoft 的命令。任何人都可以就从哪里开始以及我做错了什么给出一些简单的说明吗?

sql-server google-cloud-platform airflow google-cloud-composer

2
推荐指数
1
解决办法
1166
查看次数

在气流中特定时间运行任务

我在气流 dag 中有 3 个任务。

这三个任务具有时间依赖性

任务 1 - 早上 8 点

任务 - 凌晨 2 点 - 上午 10 点

任务-3 - 上午 12 点

我没有找到任何提到这一点的文档。它告诉我们只设置上游或下游作业。谁能帮忙解决这个问题

我正在使用 Google Cloud Composer

jenkins airflow airflow-scheduler google-cloud-composer

2
推荐指数
1
解决办法
5245
查看次数

Google Cloud Composer:节省成本

我想弄清楚如何通过 Google Cloud Composer 节省成本。无论如何,当您的所有 dag 都没有运行时,是否可以关闭服务器?然后在需要运行 dag 时再次启动它?

它的成本太高了,因为我相信即使我的 dag 没有运行,服务器也会保持运行并且我们正在收费。

谢谢,

airflow google-cloud-composer

2
推荐指数
1
解决办法
1088
查看次数