标签: google-cloud-composer

如何在Cloud Composer中重新启动Web服务器

我们最近遇到了一个关于气流的已知问题:

气流"此DAG在网络服务器DagBag对象中不可用"

现在我们使用临时解决方案通过更改配置来重新启动整个环境,但这不是一种有效的方法.现在我们认为最好的解决方法是重新启动云编写器上的Web服务器,但我们没有找到任何重启webserver的命令.这可能是一个动作吗?

谢谢!

google-cloud-platform airflow google-cloud-composer

7
推荐指数
1
解决办法
661
查看次数

没有名为airfow.gcp的模块-如何运行使用python3 / beam 2.15的数据流作业?

当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?

我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?

python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator

7
推荐指数
1
解决办法
159
查看次数

当超过“dagrun_timeout”时,气流触发“on_failure_callback”

目前正在致力于为 Airflow 中长时间运行的任务设置警报。为了取消/使气流 dag 失败,我在 default_args 中放入了“dagrun_timeout”,它执行了我需要的操作,当 dag 运行时间过长(通常被卡住)时,它会失败/出错。唯一的问题是,当超过 dagrun_timeout 时,“on_failure_callback”中的函数不会被调用,因为“on_failure_callback”位于任务级别(我认为),而 dagrun_timeout 位于 dag 级别。

当超过 dagrun_timeout 时,如何执行“on_failure_callback”,或者如何指定 dag 失败时要调用的函数?或者我应该重新考虑我的方法?

alert directed-acyclic-graphs airflow google-cloud-composer

7
推荐指数
1
解决办法
9407
查看次数

如何在本地安装与 GCP 上的 Cloud Composer Airflow 环境中安装的相同的 pip 依赖项?

我正在尝试在 VS Code 中设置本地开发环境,在其中我可以获得 Cloud Composer/Apache Airflow 使用的包的代码完成。到目前为止,我已经成功地使用了虚拟环境(使用 创建python -m venv .venv)和一个非常小的requirements.txt文件,该文件仅包含安装到本地环境中的 Airflow 包。

该文件是这样的:

apache-airflow==1.10.15
Run Code Online (Sandbox Code Playgroud)

我可以pip install -r requirements.txt在 VS Code 中激活我的虚拟环境后运行,将其安装到我的虚拟环境中,之后我在 VS Code 中获得其文档中快速入门 DAG 的代码完成,即BashOperator

随着我学习更多教程,我希望获得更多代码补全。例如,按照KubernetesPodOperator教程(https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator),我收到此错误,并且 VS Code 无法识别导入:

无法解析导入“airflow.providers.cncf.kubernetes.operators.kubernetes_pod”Pylance(reportMissingImports)

我认为下一步最好是将与在 Cloud Composer 环境中运行的完全相同的 PyPI 包安装到我的虚拟环境中。我使用页面https://cloud.google.com/composer/docs/concepts/versioning/composer-versions来查看安装了哪些软件包:

GCP UI 中的版本

所以我的requirements.txt文件看起来像这样:

absl-py==1.0.0
alembic==1.5.7
amqp==2.6.1
apache-airflow==1.10.15+composer
apache-airflow-backport-providers-apache-beam==2021.3.13
apache-airflow-backport-providers-cncf-kubernetes==2021.3.3
apache-airflow-backport-providers-google==2022.4.1+composer
apache-beam==2.37.0
apispec==1.3.3
appdirs==1.4.4
argcomplete==1.12.2
astunparse==1.6.3
attrs==20.3.0
Babel==2.9.0
bcrypt==3.2.0
billiard==3.6.3.0
cached-property==1.5.2
cachetools==4.2.1
cattrs==1.1.2
celery==4.4.7
certifi==2020.12.5
cffi==1.14.5
chardet==4.0.0
click==6.7 …
Run Code Online (Sandbox Code Playgroud)

python pip google-cloud-platform airflow google-cloud-composer

7
推荐指数
1
解决办法
2949
查看次数

Google Cloud Composer和Google Cloud SQL

我们有哪些方法可以从新推出的Google Cloud Composer连接到Google Cloud SQL(MySQL)实例?目的是将Cloud SQL实例中的数据导入BigQuery(可能通过云存储实现中间步骤).

  1. Cloud SQL代理是否可以在托管上以某种方式暴露给托管Composer的Kubernetes集群?

  2. 如果没有,可以使用Kubernetes Service Broker引入Cloud SQL Proxy吗?- > https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker

  3. 应该使用Airflow来安排和调用GCP API命令,例如1)将mysql表导出到云存储2)读取mysql导出到bigquery?

  4. 也许还有其他方法让我无法完成这项工作

google-cloud-sql google-cloud-platform airflow google-cloud-composer

6
推荐指数
2
解决办法
2556
查看次数

在哪里保存用于 Google Cloud Composer 连接设置的服务帐户密钥文件?

我正在尝试使用服务帐户密钥在 Google Cloud Composer 中设置 Google Cloud Platform 连接。所以我创建了一个 GCS 存储桶并将服务帐户密钥文件放在存储桶中。密钥存储在 JSON 中。在密钥文件路径字段中,我指定了一个 GCS 存储桶,在密钥文件 JSON 字段中,我指定了文件名。范围是https://www.googleapis.com/auth/cloud-platform

尝试使用此连接启动 Dataproc 集群时,出现找不到 JSON 文件的错误。

查看错误消息,代码尝试使用以下方法解析文件: with open(filename, 'r') as file_obj这显然不适用于 GCS 存储桶路径。

所以我的问题是,如果不能将这个服务帐户密钥文件放在 GCS 路径中,我应该把它放在哪里?

google-cloud-platform airflow google-cloud-composer

6
推荐指数
1
解决办法
6112
查看次数

如何经济高效地配置 Google Cloud Composer

经过一些研究和测试,我们决定开始使用 Google Cloud Composer。由于我们当前的 DAG 和任务相对较小,并且不需要服务器连续运行,因此我正在寻找如何管理成本。

两个问题:

  1. 使用preemptible虚拟机的选项似乎合乎逻辑。这大大节省了成本,我正在考虑选择 3x n1-standard-4。我希望每个任务都很短,所以不要认为这会对我们的工作负载产生重大影响。是否可以将preemptibleVM 与 Composer一起使用?
  2. 计划打开/关闭 Composer 环境,如本文所述。我在文档中找不到如何做到这一点,要么通过关闭整个环境,要么按照答案中的建议关闭工作人员。

帮助,有人吗?

airflow google-cloud-composer

6
推荐指数
1
解决办法
2041
查看次数

使用插件导入DAG时出现气流错误 - 只能在操作员之间设置关系

我编写了一个气流插件,它只包含一个自定义运算符(以支持BigQuery中的CMEK).我可以使用单个任务创建一个简单的DAG,该任务使用此运算符并且执行正常.

但是,如果我尝试在DAG中从DummyOperator任务创建依赖关系到我的自定义操作员任务,则DAG无法在UI中加载并抛出以下错误,我无法理解为什么会抛出此错误?

破坏的DAG:[/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py]关系只能在运营商之间设置; 收到BQCMEKOperator

到目前为止,我已经在composer-1.4.2-airflow-1.9.0,composer-1.4.2-airflow-1.10.0和composer-1.4.1-airflow-1.10.0上进行了测试.

每个任务的运行气流测试都可以顺利完成.

在DAG中单独使用它可以正常工作(如下所示)所以我不相信插件本身存在任何错误

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator


default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}


dag = DAG(
    'js_bq_custom_plugin',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

run_this = BQCMEKOperator(
    task_id     = 'cmek_plugin_test',
    sql         = 'select * from ds.foo LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test10',
    key         = 'xxx',
    dag     = dag
)
Run Code Online (Sandbox Code Playgroud)

然而,如果我引入DummyOperator和依赖项,则会发生错误

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from …
Run Code Online (Sandbox Code Playgroud)

airflow google-cloud-composer

6
推荐指数
1
解决办法
1183
查看次数

Cloud Composer 与 Cloud Scheduler

我目前正在学习 GCP 数据工程师考试,并且一直在努力了解何时使用 Cloud Scheduler 以及何时使用 Cloud Composer。

通过阅读文档,我的印象是,当作业之间存在相互依赖关系时,应该使用 Cloud Composer,例如,我们需要一个作业的输出在第一个完成时启动另一个作业,并使用来自第一个作业的依赖项。然后,您可以根据需要灵活地链接任意数量的这些“工作流”,并提供在失败时重新启动作业、运行批处理作业、shell 脚本、链查询等的机会。

对于 Cloud Scheduler,它在可以执行哪些任务方面具有非常相似的功能,但是,它更多地用于常规作业,您可以定期执行,并且在作业之间存在相互依赖关系或在开始另一个工作之前,您需要等待其他工作。因此,似乎更适合用于“更简单”的任务。

这些想法是在尝试回答我发现的一些考试问题之后产生的。然而,我对我找到的“正确答案”感到惊讶,并希望有人能澄清这些答案是否正确,以及我是否理解何时使用一个而不是另一个。

以下是有关此主题的让我感到困惑的示例问题:

问题 1

您正在实施几个必须按计划执行的批处理作业。这些作业有许多必须按特定顺序执行的相互依赖的步骤。部分作业涉及执行 shell 脚本、运行 Hadoop 作业以及在 BigQuery 中运行查询。这些作业预计会运行几分钟到几个小时。如果这些步骤失败,则必须重试固定次数。您应该使用哪种服务来管理这些作业的执行?

A. 云调度器

B. 云数据流

C. 云功能

D. 云作曲家

正确答案:A

问题2

您想要自动执行在 GCP 上运行的多步骤数据管道。该管道包括相互具有多个依赖关系的 Cloud Dataproc 和 Cloud Dataflow 作业。您希望尽可能使用托管服务,并且管道将每天运行。您应该使用哪种工具?

A. cron

B. 云作曲家

C. 云调度器

D. Cloud Dataproc 上的工作流模板

正确答案:D

问题 3

您的公司有一个混合云计划。您有一个复杂的数据管道,可以在云提供商服务之间移动数据并利用每个云提供商的服务。您应该使用哪种云原生服务来编排整个管道?

A. 云数据流

B. 云作曲家

C. 云数据准备

D. 云数据过程

正确答案:D

对此的任何见解将不胜感激。谢谢 !

google-cloud-platform google-cloud-composer google-cloud-scheduler

6
推荐指数
2
解决办法
4581
查看次数

创建具有相同 DAGS 的新环境时缺少 FERNET_KEY 配置

我在 Google Cloud 中使用 Composer (Airflow)。我想创建一个新环境,并将旧环境中相同的 DAG 和变量带到新环境中。

为此,我执行以下操作:

  • 我检查了几个变量并将它们导出到 JSON 文件。
  • 在我的新环境中,我导入了相同的 JSON 文件。
  • 我使用gsutil相同的 DAG 并将其上传到新环境

但是,在新环境中,由于FERNET_KEY configuration is missing. 我最好的猜测是,这与导入使用单独的 Fernet 密钥加密的变量有关,但我不确定。

有没有人遇到过这个问题?如果是这样,你是如何解决的?

python python-3.x airflow google-cloud-composer

6
推荐指数
1
解决办法
2770
查看次数