我是 Terraform 新手,有没有任何直接的方法可以使用 Terraform 管理和创建 Google Cloud Composer 环境?
我检查了 GCP 支持的组件列表,似乎 Google Cloud Composer 目前还不存在。作为解决办法,我正在考虑创建一个 shell 脚本,包括所需的 gcloud Composer cli 命令并使用 Terraform 运行它,这是正确的方法吗?请提出替代方案。
如何为 Cloud Composer 集群启动云代理
目前我们使用气流来管理作业和动态 DAG 创建。为此,编写一个单独的 Dag 来检查 PostgreSQL 中的数据库表是否存在现有规则,并且如果规则在 PostgreSQL 中处于活动/非活动状态,我们会在 Airflow 中手动设置关闭/打开动态 DAG。现在,我们将使用 Google 自己的托管 Cloud Composer,但问题是我们无权访问 Cloud Composer 的数据库。如何使用云sql代理来解决这个问题呢?
google-cloud-platform kubernetes cloud-sql-proxy google-cloud-composer
我正在使用GCP Composer来管理 GCP 上的 Apache。
对于新项目,我使用新版本的 Composer/Airflow(composer:1.6.1,Airflow:1.10)
要通过 shell 连接气流以检查损坏的 DAG,GCP 文档说明:
打开 GCP shell
连接到 GKE 集群
myuser @cloudshell:~ kubectl 获取 pod
myuser @cloudshell:~ kubectl exec -it airflow-worker- 1a2b3c - xyz12 -c airflow-worker -- /bin/bash
这在 Airflow 1.9 上工作正常,但在 Airflow 1.10 上kubectl get pods不显示工作 pod,而且我还没有找到有关如何在 AF 1.10 上通过 kubeclt 访问气流的文档
有人可以帮助我吗?
_myuser_@cloudshell:~ (_Myproject_)$ kubectl get pods
NAME READY STATUS RESTARTS AGE
airflow-monitoring-564c8c7dc5-hxb62 1/1 Running 0 17h
airflow-redis-0 1/1 Running 0 17h
airflow-sqlproxy-594dbf87b7-nmtbh 1/1 …Run Code Online (Sandbox Code Playgroud) 我需要将文件从 FTP 服务器复制到特定的 GCS 位置。我正在使用 ftp_hook 将文件下载到 /data 文件夹中。我需要将此文件移动到不同的 GCS 存储桶,而不是 Composer GCS 存储桶。
我正在尝试使用GoogleCloudStorageToGoogleCloudStorageOperator运算符将文件从 Composer 存储桶复制到所需的存储桶。为此,我需要在 Airflow 任务中读取 Composer 存储桶。我不想将其添加为自定义变量,因为我的作曲家本身是动态创建的。那么如何获取我的数据文件夹所在的composer存储桶的信息呢?
我无法通过 Cloud Shell 以 json 格式编辑气流变量的值。
我正在使用 cloud shell 访问我的气流变量参数(以 json 格式),当我使用以下命令时,它为我提供了完整的 json:
gcloud composer environments run composer001
--location us-east1 variables
--get params
Run Code Online (Sandbox Code Playgroud)
但是我想编辑 json 中的值之一,我如何访问它?
我参考了 google 上的文档和其他各种链接,但是只能找到如何设置不是 json 格式而是单值变量的变量。
我想让我的 Cloud Composer 环境(Google Cloud 的托管 Apache Airflow 服务)在不同的kubernetes 集群上启动pod。我该怎么做?
请注意,Cloud composer 在 kubernetes 集群上运行气流。该集群被认为是作曲家的“环境”。使用 的默认值KubernetesPodOperator,composer 将在自己的集群上调度 pod。但是,在这种情况下,我有一个不同的 kubernetes 集群,我想在其上运行 pod。
我可以连接到工作 Pod 并在gcloud container clusters get-credentials CLUSTERNAME那里运行,但 Pod 时不时会被回收,所以这不是一个持久的解决方案。
我注意到 theKubernetesPodOperator既有 anin_cluster又有cluster_context论点,这似乎很有用。我希望这会起作用:
pod = kubernetes_pod_operator.KubernetesPodOperator(
task_id='my-task',
name='name',
in_cluster=False,
cluster_context='my_cluster_context',
image='gcr.io/my/image:version'
)
Run Code Online (Sandbox Code Playgroud)
但这导致 kubernetes.config.config_exception.ConfigException: Invalid kube-config file. Expected object with name CONTEXTNAME in kube-config/contexts list
虽然如果我kubectl config get-contexts在工作 Pod 中运行,我可以看到列出的集群配置。
所以我无法弄清楚的是:
google-cloud-platform kubernetes airflow google-cloud-composer
我发现 Google Cloud Composer 是非常有前途的托管 Apache Airflow 服务,但我不知道如何使用 Cloud Composer 通过 PySpark 代码执行管道。我可以安装其他 Python 包(例如 Pandas)并使用 Cloud Composer。
任何指针都非常感激。
我正在尝试创建文件夹并将文件写入dags位于 Google Cloud Storage 存储桶中的文件夹中。这使用 Airflow 使用以下 python 代码:
Path(f'/home/airflow/gcs/dags/API/config').mkdir(parents=True, exist_ok=True)
with open(file='/home/airflow/gcs/dags/API/config/config.json', mode = 'w') as out_file:
out_file.write(json_string)
Run Code Online (Sandbox Code Playgroud)
不会引发任何错误,但不会在任何地方创建文件夹或文件。我对实际有效的数据目录尝试了相同的方法
python bucket google-cloud-storage airflow google-cloud-composer
我有几个可以同时运行的任务。当他们完成后,我需要运行最后一个任务。我尝试使用任务分组来做到这一点,如下所示:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
with TaskGroup(group_id='task_group_1') as tg1:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
[task1, task2]
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
tg1 >> final_task
Run Code Online (Sandbox Code Playgroud)
然而,这里发生的情况是 Final_task 在任务组中的每个任务之后运行多次,因此:
任务1 -> 最终任务 任务2 -> 最终任务
我想要的是任务组并行运行,并且当最终任务完成时只运行一次,如下所示:
[任务1,任务2] -> 最终任务
我认为使用任务组可以帮助我完成此要求,但它没有按预期工作。有人可以帮忙吗?谢谢。
编辑:这是 Airflow 文档示例的结果。它导致task3在group.task1和group1.task2之后运行。我需要它在两个分组任务完成后只运行一次。
最后编辑:事实证明我误解了树视图 - 图形视图确认了分组操作,尽管我仍然收到最终任务的一些其他错误。感谢您帮助我了解有关 DAG 的更多信息。
我无法将语句导入为-
from airflow.contrib.operators.gcp_sql_operator import CloudSqlQueryOperator
Run Code Online (Sandbox Code Playgroud)
我想将它导入到我的 DAG 文件中,该文件将在版本为 1.10.0 而不是 1.9.0 的云作曲家气流中运行。这里只是为了检查,我尝试将 gcs_to_gcs 导入为-
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
Run Code Online (Sandbox Code Playgroud)
我可以导入它,但不能导入 gcp_sql_operator。
当 Google Cloud Composer 中的气流中 DAG 运行失败时,我试图向 Tom slack 发送通知。使用的airflow版本是1.9,所以我不能使用slack webhooks。但是当我添加代码时,我收到这个奇怪的错误:没有名为 \'slackclient\' 的模块
\n\n我不知道如何在谷歌云作曲家中进行这项工作。我尝试通过在 Composer 中添加 PyPi 变量来安装 slack 包。但到目前为止没有任何效果。\n有人可以帮忙吗?
\n\n我的代码:
\n\nfrom slackclient import SlackClient\nfrom airflow.operators.slack_operator import SlackAPIPostOperator\n\nslack_channel= \'gsdgsdg\'\nslack_token = \'ssdfhfdrtxcuweiwvbnw54135f543589zdklchvf\xc3\xb6\'\n\ndef task_fail_slack_alert(context):\n\n slack_msg = \\\n """\n :red_circle: Task Failed. \n *Task*: {task} \n *Dag*: {dag} \n *Execution Time*: {exec_date} \n *Log Url*: {log_url} \n """.format(task=context.get(\'task_instance\'\n ).task_id, dag=context.get(\'task_instance\').dag_id,\n ti=context.get(\'task_instance\'),\n exec_date=context.get(\'execution_date\'),\n log_url=context.get(\'task_instance\').log_url)\n\n failed_alert = SlackAPIPostOperator(\n task_id = \'airflow_etl_failed\',\n channel = slack_channel,\n token = slack_token,\n text = slack_msg\n )\n\n\n return …Run Code Online (Sandbox Code Playgroud) 我想在 apache airflow 中安排一个 google cloud bigquery 存储过程。我没有在气流中看到任何文档。我应该使用哪个调度程序在 apache airflow 上调度 bigquery 存储过程。你能给我看一些例子吗?太感谢了。
我们有一个 Python 版本 3 的 GCP Composer。我想将版本更新到 Python 3.9,但我无法找到编辑选项来在我的 Composer 中进行此更改。谁能帮助我实现这一目标?
任何帮助表示赞赏。
airflow ×7
kubernetes ×2
python ×2
apache-spark ×1
bucket ×1
kubectl ×1
slack ×1
terraform ×1
worker ×1