标签: google-cloud-composer

运行 Google 的 Cloud Compose 时,气流 dag 依赖项对 dag 不可用

Airflow 允许您将 dag 依赖的依赖项(外部 python 代码到 dag 代码)放在 dag 文件夹中。这意味着这些外部 python 代码中的任何组件/成员或类都可用于 dag 代码。

但是,在执行此操作时(在云组合环境的 GCS dag 文件夹中),依赖项的组件对 dag 不可用。Airflow Web UI 中显示类似于以下内容的错误:Broken DAG: [/home/airflow/gcs/dags/....py] No module named tester. 其中 tester 是 dags 文件夹中的一个单独的 python 文件。

在使用 Google 的 SDK(运行实际的 Airflow 命令)测试这些任务时,任务运行良好,但似乎在 Kubernettes 中的某个地方创建了这些容器作业,它似乎也没有接管依赖项。

我意识到 Cloud Compose 处于测试阶段,但我想知道我是否做错了什么。

google-cloud-platform google-kubernetes-engine airflow google-cloud-composer

3
推荐指数
1
解决办法
5082
查看次数

云作曲家构建日志,它们在哪里?

我正在尝试按照本指南在 cloud composer 上安装 pypi 依赖项。

构建失败,错误消息说:

name: "operations/14021472-6dbe-42b3-8ec1-ba7ac62be60e"
done: true
sequence_number: 1
error {
  code: 0
  message: "The image build failed: Build failed; check build logs for details\n"
}
Run Code Online (Sandbox Code Playgroud)

但是,我找不到构建日志的位置。有人知道答案吗?谢谢

仅供参考,这是我尝试安装的软件包列表(作为requirements.txt文件):

alembic==0.8.10
bleach==2.1.2
boto3==1.4.5
botocore==1.5.92
certifi==2016.2.28
click==6.7
croniter==0.3.24
dill==0.2.8.2
docutils==0.14
flask==0.11.1
flask-admin==1.4.1
flask-cache==0.13.1
flask-login==0.2.11
flask-swagger==0.2.13
flask-wtf==0.14.2
future==0.16.0
gitdb2==2.0.4
gitpython==2.1.11
gunicorn==19.9.0
html5lib==1.0.1
itsdangerous==0.24
jinja2==2.8
jmespath==0.9.0
lockfile==0.12.2
lxml==3.8.0
mako==1.0.7
markdown==2.6.11
markupsafe==1.0
numpy==1.13.1
pandas==0.23.3
psutil==4.4.2
pygments==2.2.0
pyspark
python-daemon==2.1.2
python-dateutil==2.7.3
python-editor==1.0.3
python-nvd3==0.14.2
python-slugify==1.2.5
pytz==2018.5
pyyaml==3.12
requests==2.13.0
s3transfer==0.1.10
setproctitle==1.1.10
six==1.11.0 …
Run Code Online (Sandbox Code Playgroud)

google-cloud-composer

3
推荐指数
1
解决办法
872
查看次数

在 Google Cloud Composer 中使用 json 文件导入变量

如何使用命令行将 json 文件导入 Google Cloud Composer?

我试过下面的命令

gcloud composer environments run comp-env --location=us-central1 variables -- --import composer_variables.json
Run Code Online (Sandbox Code Playgroud)

我收到以下错误

[2019-01-17 13:34:54,003] {configuration.py:389} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-01-17 13:34:54,117] {app.py:44} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
Missing variables file.
Run Code Online (Sandbox Code Playgroud)

但是当我使用以下命令设置单个变量时,它工作正常。

gcloud composer environments run comp-env --location=us-central1 variables -- --set variable_name variable_value
Run Code Online (Sandbox Code Playgroud)

由于我要导入的变量超过 75 个,因此我们需要使用 json 文件导入它。请帮我解决这个问题

google-cloud-platform airflow google-cloud-composer

3
推荐指数
1
解决办法
2750
查看次数

如何使用 bigquery 运算符将查询参数传递给 sql 文件

我需要在 sql 文件中访问 BigqueryOperator 传递的参数,但是我ERROR - queryParameters argument must have a type <class 'dict'> not <class 'list'> 使用以下代码时出现错误:

t2 = bigquery_operator.BigQueryOperator(
task_id='bq_from_source_to_clean',
sql='prepare.sql',
use_legacy_sql=False,
allow_large_results=True,
query_params=[{ 'name': 'threshold_date', 'parameterType': { 'type': 'STRING' },'parameterValue': { 'value': '2020-01-01' } }],
destination_dataset_table="{}.{}.{}".format('xxxx',
                                            'xxxx',
                                            'temp_airflow_test'),
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
dag=dag
Run Code Online (Sandbox Code Playgroud)

)

查询:

select  cast(DATE_ADD(a.dt_2, interval 7 day) as DATE) as dt_1
,a.dt_2
,cast('2010-01-01' as DATE) as dt_3 
from (select cast(@threshold_date as date) as dt_2) a
Run Code Online (Sandbox Code Playgroud)

我正在使用 Google 作曲家版本composer-1.7.0-airflow-1.10.2

提前致谢。

airflow google-cloud-composer

3
推荐指数
1
解决办法
4184
查看次数

使用 Python 3.7 将文件添加到云存储时如何使用云函数触发云作曲家 DAG

每次将文档放入某个存储桶时,我都想启动 DAG 工作流来分析此文档。我需要使用云功能使用云存储触发器和事件类型来触发 DAG 工作流完成并创建

python google-cloud-functions google-cloud-composer

3
推荐指数
1
解决办法
2031
查看次数

使用 Pub/Sub 消息触发 Cloud Composer DAG

我正在尝试创建一个通过 Pub/Sub 消息触发的 Cloud Composer DAG。Google 提供了以下示例,每次 Cloud Storage 存储桶中发生更改时都会触发 DAG: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf

然而,一开始他们就说you can trigger DAGs in response to events, such as a change in a Cloud Storage bucket or a message pushed to Cloud Pub/Sub。我花了很多时间尝试弄清楚如何做到这一点,但没有结果。

您能帮忙或给我一些指示吗?谢谢!

directed-acyclic-graphs google-cloud-pubsub airflow google-cloud-composer

3
推荐指数
1
解决办法
6730
查看次数

googleapi:错误 400:先决条件检查失败。通过 Terraform 创建 Cloud Composer 环境时失败了先决条件

我正在尝试通过 Terraform 创建 Cloud Composer 环境并收到此错误

googleapi:错误 400:先决条件检查失败。通过 Terraform 创建 Cloud Composer 环境时失败了先决条件

我尝试从中创建 Composer 的 VM 的服务帐户在 GCP 项目中具有所有者权限。

我已尝试使用 GCP 控制台中的相同作曲家配置,并且创建的环境没有任何问题。

我尝试禁用 Cloud Composer API 并再次启用它,但没有解决方案。

最终,第一次执行 terraform apply 时,它试图创建 Composer 环境,但最终出现版本错误,我更改了 Composer 的 Image 版本。现在我面临这个问题。有人可以帮忙吗?

来自终端的错误消息

作曲家/main.tf

    resource "google_composer_environment" "etl_env" {
    provider = google-beta
    name     = var.env_name
    region   = var.region
    config {
    node_count = 3

    node_config {
      zone         = var.zone
      machine_type = var.node_machine_type

      network    = var.network
      subnetwork = var.app_subnet_selflink

      ip_allocation_policy {
      use_ip_aliases = true
    }
   }

   software_config …
Run Code Online (Sandbox Code Playgroud)

google-cloud-platform terraform google-cloud-composer

3
推荐指数
1
解决办法
7497
查看次数

GCP Apache Airflow - 如何从私有存储库安装 Python 包并在 DAG 上导入?

我有一个私人存储库。这个存储库有我的 DAG 的常用功能。(例如:日期时间验证器、响应编码器函数)我想将此存储库的函数导入到我的 DAG 文件中,我使用此链接来执行此操作。

我创建了pip.conf文件。该文件的位置是:my-bucket-name/config/pip/pip.conf并且我在该文件中添加了我的私人 github 存储库,如下所示:

[global]
extra-index-url=https://<token>@github.com/my-private-github-repo.git
Run Code Online (Sandbox Code Playgroud)

之后,我想在我的 dag 文件上导入此存储库的函数(例如:from common-repo import *),但我在我的 DAG 上收到 “模块未找到”错误。(不幸的是,在云作曲家日志中,我看不到任何显示私人 github 存储库已安装的日志。)

我搜索了很多,但找不到如何做到这一点。

python google-cloud-platform airflow google-cloud-composer

3
推荐指数
1
解决办法
1371
查看次数

如何导入 2.2.5 版本的 Airflow 运算符?

我刚刚将 Airflow 升级到 2.2.5,但无法使用 EmptyOperator。它应该很简单from airflow.operators.empty import EmptyOperator,但我收到错误ModuleNotFoundError: No module named 'airflow.operators.empty'。我也尝试过:

from airflow.operators import empty
from empty.operators import EmptyOperator
Run Code Online (Sandbox Code Playgroud)

Airflow 存储库本身显示了应该可以工作的结构 from airflow.operators.empty import EmptyOperator,但事实并非如此,所以我对发生的事情感到非常困惑。

airflow google-cloud-composer

3
推荐指数
1
解决办法
3511
查看次数

Cloud Composer(Airflow)作业卡住了

自从我取消了一个耗时太长的任务实例以来,我的Cloud Composer管理人员Airflow被困了几个小时(我们称它为任务A)

我已经清除了所有DAG运行和任务实例,但是有几个正在运行的作业和一个处于“关机”状态的作业(我想是任务A的作业)(我的作业的快照)。

此外,由于最近删除的DAG一直出现在仪表板中,因此调度程序似乎未运行

有没有办法杀死工作或重置调度程序?不拘泥作曲家的任何想法都将受到欢迎。

google-cloud-platform airflow google-cloud-composer

2
推荐指数
1
解决办法
2523
查看次数