标签: google-cloud-composer

使用 Terraform 的 Google Cloud Composer

我是 Terraform 新手,有没有任何直接的方法可以使用 Terraform 管理和创建 Google Cloud Composer 环境?

我检查了 GCP 支持的组件列表,似乎 Google Cloud Composer 目前还不存在。作为解决办法,我正在考虑创建一个 shell 脚本,包括所需的 gcloud Composer cli 命令并使用 Terraform 运行它,这是正确的方法吗?请提出替代方案。

terraform google-cloud-composer

1
推荐指数
1
解决办法
2913
查看次数

如何在 Cloud Composer 集群中启动云代理

如何为 Cloud Composer 集群启动云代理

目前我们使用气流来管理作业和动态 DAG 创建。为此,编写一个单独的 Dag 来检查 PostgreSQL 中的数据库表是否存在现有规则,并且如果规则在 PostgreSQL 中处于活动/非活动状态,我们会在 Airflow 中手动设置关闭/打开动态 DAG。现在,我们将使用 Google 自己的托管 Cloud Composer,但问题是我们无权访问 Cloud Composer 的数据库。如何使用云sql代理来解决这个问题呢?

google-cloud-platform kubernetes cloud-sql-proxy google-cloud-composer

1
推荐指数
1
解决办法
1199
查看次数

如何通过 kubectl 在 google-cluoud-composer 和 Airflow 1.10 上访问 Airflow

我正在使用GCP Composer来管理 GCP 上的 Apache。

对于新项目,我使用新版本的 Composer/Airflow(composer:1.6.1,Airflow:1.10)

要通过 shell 连接气流以检查损坏的 DAG,GCP 文档说明:

  1. 打开 GCP shell

  2. 连接到 GKE 集群

  3. myuser @cloudshell:~ kubectl 获取 pod

  4. myuser @cloudshell:~ kubectl exec -it airflow-worker- 1a2b3c - xyz12 -c airflow-worker -- /bin/bash

这在 Airflow 1.9 上工作正常,但在 Airflow 1.10 上kubectl get pods不显示工作 pod,而且我还没有找到有关如何在 AF 1.10 上通过 kubeclt 访问气流的文档

有人可以帮助我吗?

_myuser_@cloudshell:~ (_Myproject_)$ kubectl get pods 
NAME                                                        READY     STATUS      RESTARTS   AGE
airflow-monitoring-564c8c7dc5-hxb62                         1/1       Running     0          17h
airflow-redis-0                                             1/1       Running     0          17h
airflow-sqlproxy-594dbf87b7-nmtbh                           1/1 …
Run Code Online (Sandbox Code Playgroud)

worker airflow kubectl google-cloud-composer

1
推荐指数
1
解决办法
2444
查看次数

如何获取composer数据文件夹的GCS路径

我需要将文件从 FTP 服务器复制到特定的 GCS 位置。我正在使用 ftp_hook 将文件下载到 /data 文件夹中。我需要将此文件移动到不同的 GCS 存储桶,而不是 Composer GCS 存储桶。

我正在尝试使用GoogleCloudStorageToGoogleCloudStorageOperator运算符将文件从 Composer 存储桶复制到所需的存储桶。为此,我需要在 Airflow 任务中读取 Composer 存储桶。我不想将其添加为自定义变量,因为我的作曲家本身是动态创建的。那么如何获取我的数据文件夹所在的composer存储桶的信息呢?

airflow google-cloud-composer

1
推荐指数
1
解决办法
3853
查看次数

如何从命令行设置/获取json格式的气流变量

我无法通过 Cloud Shell 以 json 格式编辑气流变量的值。

我正在使用 cloud shell 访问我的气流变量参数(以 json 格式),当我使用以下命令时,它为我提供了完整的 json:

gcloud composer environments run composer001 
--location us-east1 variables 
--get params
Run Code Online (Sandbox Code Playgroud)

但是我想编辑 json 中的值之一,我如何访问它?

我参考了 google 上的文档和其他各种链接,但是只能找到如何设置不是 json 格式而是单值变量的变量。

google-cloud-platform google-cloud-composer

1
推荐指数
1
解决办法
3452
查看次数

如何让 Google Cloud Composer (airflow) 在不同的 kubernetes 集群上运行作业?

我想让我的 Cloud Composer 环境(Google Cloud 的托管 Apache Airflow 服务)在不同的kubernetes 集群上启动pod。我该怎么做?

请注意,Cloud composer 在 kubernetes 集群上运行气流。该集群被认为是作曲家的“环境”。使用 的默认值KubernetesPodOperator,composer 将在自己的集群上调度 pod。但是,在这种情况下,我有一个不同的 kubernetes 集群,我想在其上运行 pod。

我可以连接到工作 Pod 并在gcloud container clusters get-credentials CLUSTERNAME那里运行,但 Pod 时不时会被回收,所以这不是一个持久的解决方案。

我注意到 theKubernetesPodOperator既有 anin_cluster又有cluster_context论点,这似乎很有用。我希望这会起作用:

pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='my-task',
    name='name',
    in_cluster=False,
    cluster_context='my_cluster_context',
    image='gcr.io/my/image:version'
)
Run Code Online (Sandbox Code Playgroud)

但这导致 kubernetes.config.config_exception.ConfigException: Invalid kube-config file. Expected object with name CONTEXTNAME in kube-config/contexts list

虽然如果我kubectl config get-contexts在工作 Pod 中运行,我可以看到列出的集群配置。

所以我无法弄清楚的是:

  • 如何确保我的其他 kubernetes 集群的上下文在我的 Composer 环境的工作 …

google-cloud-platform kubernetes airflow google-cloud-composer

1
推荐指数
1
解决办法
1235
查看次数

如何让 PySpark 在 Google Cloud Composer 上运行

我发现 Google Cloud Composer 是非常有前途的托管 Apache Airflow 服务,但我不知道如何使用 Cloud Composer 通过 PySpark 代码执行管道。我可以安装其他 Python 包(例如 Pandas)并使用 Cloud Composer。

任何指针都非常感激。

apache-spark google-cloud-platform google-cloud-composer

1
推荐指数
1
解决办法
2438
查看次数

尝试将文件写入 dags 文件夹

我正在尝试创建文件夹并将文件写入dags位于 Google Cloud Storage 存储桶中的文件夹中。这使用 Airflow 使用以下 python 代码:

Path(f'/home/airflow/gcs/dags/API/config').mkdir(parents=True, exist_ok=True) 
with open(file='/home/airflow/gcs/dags/API/config/config.json', mode = 'w') as out_file:
    out_file.write(json_string)
Run Code Online (Sandbox Code Playgroud)

不会引发任何错误,但不会在任何地方创建文件夹或文件。我对实际有效的数据目录尝试了相同的方法

python bucket google-cloud-storage airflow google-cloud-composer

1
推荐指数
1
解决办法
785
查看次数

如何安排 DAG 并行运行一些任务,然后在任务完成后运行一项任务?

我有几个可以同时运行的任务。当他们完成后,我需要运行最后一个任务。我尝试使用任务分组来做到这一点,如下所示:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    with TaskGroup(group_id='task_group_1') as tg1:

    
        task1 = MyOperator(
            task_id='task1',
            dag=dag,
        )

        task2 = MyOperator(
            task_id='task2',
            dag=dag,
        )
        
        [task1, task2]    
    
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    tg1 >> final_task
   
Run Code Online (Sandbox Code Playgroud)

然而,这里发生的情况是 Final_task 在任务组中的每个任务之后运行多次,因此:

任务1 -> 最终任务 任务2 -> 最终任务

我想要的是任务组并行运行,并且当最终任务完成时只运行一次,如下所示:

[任务1,任务2] -> 最终任务

我认为使用任务组可以帮助我完成此要求,但它没有按预期工作。有人可以帮忙吗?谢谢。

编辑:这是 Airflow 文档示例的结果。它导致task3在group.task1和group1.task2之后运行。我需要它在两个分组任务完成后只运行一次。

在此输入图像描述

最后编辑:事实证明我误解了树视图 - 图形视图确认了分组操作,尽管我仍然收到最终任务的一些其他错误。感谢您帮助我了解有关 DAG 的更多信息。

airflow google-cloud-composer

1
推荐指数
1
解决办法
3308
查看次数

云作曲家中没有名为“gcp_sql_operator”的模块

我无法将语句导入为-

from airflow.contrib.operators.gcp_sql_operator import CloudSqlQueryOperator
Run Code Online (Sandbox Code Playgroud)

我想将它导入到我的 DAG 文件中,该文件将在版本为 1.10.0 而不是 1.9.0 的云作曲家气流中运行。这里只是为了检查,我尝试将 gcs_to_gcs 导入为-

from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
Run Code Online (Sandbox Code Playgroud)

我可以导入它,但不能导入 gcp_sql_operator。

google-cloud-sql google-cloud-composer

0
推荐指数
1
解决办法
820
查看次数

Slack SlackAPIPostOperator 在气流编辑器中无法正常工作

当 Google Cloud Composer 中的气流中 DAG 运行失败时,我试图向 Tom slack 发送通知。使用的airflow版本是1.9,所以我不能使用slack webhooks。但是当我添加代码时,我收到这个奇怪的错误:没有名为 \'slackclient\' 的模块

\n\n

我不知道如何在谷歌云作曲家中进行这项工作。我尝试通过在 Composer 中添加 PyPi 变量来安装 slack 包。但到目前为止没有任何效果。\n有人可以帮忙吗?

\n\n

我的代码:

\n\n
from slackclient import SlackClient\nfrom airflow.operators.slack_operator import SlackAPIPostOperator\n\nslack_channel= \'gsdgsdg\'\nslack_token = \'ssdfhfdrtxcuweiwvbnw54135f543589zdklchvf\xc3\xb6\'\n\ndef task_fail_slack_alert(context):\n\n    slack_msg = \\\n    """\n        :red_circle: Task Failed. \n        *Task*: {task}  \n        *Dag*: {dag} \n        *Execution Time*: {exec_date}  \n        *Log Url*: {log_url} \n        """.format(task=context.get(\'task_instance\'\n        ).task_id, dag=context.get(\'task_instance\').dag_id,\n        ti=context.get(\'task_instance\'),\n        exec_date=context.get(\'execution_date\'),\n        log_url=context.get(\'task_instance\').log_url)\n\n    failed_alert = SlackAPIPostOperator(\n            task_id = \'airflow_etl_failed\',\n            channel = slack_channel,\n            token = slack_token,\n            text = slack_msg\n    )\n\n\n    return …
Run Code Online (Sandbox Code Playgroud)

slack airflow google-cloud-composer

0
推荐指数
1
解决办法
2634
查看次数

如何使用apache气流计划谷歌云bigquery存储过程

我想在 apache airflow 中安排一个 google cloud bigquery 存储过程。我没有在气流中看到任何文档。我应该使用哪个调度程序在 apache airflow 上调度 bigquery 存储过程。你能给我看一些例子吗?太感谢了。

https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/bigquery.html#execute-bigquery-jobs

google-bigquery airflow google-cloud-composer

0
推荐指数
1
解决办法
4393
查看次数

升级 GCP Composer 中的 python 版本

我们有一个 Python 版本 3 的 GCP Composer。我想将版本更新到 Python 3.9,但我无法找到编辑选项来在我的 Composer 中进行此更改。谁能帮助我实现这一目标?

任何帮助表示赞赏。

python google-cloud-platform google-cloud-composer

0
推荐指数
1
解决办法
1538
查看次数