标签: google-cloud-composer

dataproc 上的组件网关激活不适用于composer(airflow)操作符airflow.providers.google.cloud.operators.dataproc

I\xe2\x80\x99m 尝试执行下面的 dag。\n创建 dataproc 集群的操作员似乎无法启用可选组件来启用 jupyter 笔记本和 anaconda。\n我在这里找到了此代码:Component Gateway with DataprocOperator on Airflow尝试解决它,但对我来说它没有解决它,因为我认为这里的作曲家(气流)版本是不同的。我的版本是composer - 2.0.0-preview.5,airflow-2.1.4。

\n

该操作员在创建集群时工作得很好,但它没有使用可选组件来创建以启用 Jupyter Notebook。\n有人有任何想法可以帮助我吗?

\n
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor\nfrom airflow import DAG\nfrom datetime import datetime, timedelta\nfrom airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\n\nyesterday = datetime.combine(datetime.today() - timedelta(1),\n                             datetime.min.time())\n\n\ndefault_args = {\n    \'owner\': \'teste3\',\n    \'depends_on_past\': False,\n    \'start_date\' :yesterday,\n    \'email\': [\'airflow@example.com\'],\n    \'email_on_failure\': False,\n    \'email_on_retry\': False,\n    \'retries\': 0,\n    \'retry_delay\': timedelta(minutes=5),\n\n}\n\ndag = DAG(\n    \'teste-dag-3\',catchup=False, default_args=default_args, schedule_interval=None)\n\n\n# configura os componentes\nclass CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):\n\n    def __init__(self, *args, **kwargs):\n        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)\n\n    def _build_cluster_data(self):\n …
Run Code Online (Sandbox Code Playgroud)

google-cloud-platform google-cloud-dataproc airflow google-cloud-composer

5
推荐指数
1
解决办法
902
查看次数

GCP Composer v1.18.6 和 2.0.10 与 CloudSqlProxyRunner 不兼容

在我的Composer Airflow DAG 中,我一直在使用 CloudSqlProxyRunner连接到我的 Cloud SQL 实例。

然而,在将 Google Cloud Composer 从 v1.18.4 更新到 1.18.6 后,我的 DAG 开始遇到一个奇怪的错误:

[2022-04-22, 23:20:18 UTC] {cloud_sql.py:462} INFO - Downloading cloud_sql_proxy from https://dl.google.com/cloudsql/cloud_sql_proxy.linux.x86_64 to /home/airflow/dXhOYoU_cloud_sql_proxy.tmp
[2022-04-22, 23:20:18 UTC] {taskinstance.py:1702} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1457, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1513, in _execute_task
    result = execute_callable(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/decorators/base.py", line 134, in …
Run Code Online (Sandbox Code Playgroud)

google-cloud-sql airflow cloud-sql-proxy google-cloud-composer

5
推荐指数
1
解决办法
478
查看次数

如何设置 Cloud Composer 以发送电子邮件?

我想通过Cloud Composer 接收电子邮件通知,但我不确定如何做到这一点。如何配置 Composer 环境以发送电子邮件通知?

sendgrid google-cloud-platform google-cloud-composer

4
推荐指数
1
解决办法
4354
查看次数

从Google Cloud Composer运行docker operator

至于文档,Google Cloud Composer气流工作者节点由专用的kubernetes集群提供:

在此输入图像描述

我有一个Docker包含ETL步骤,我想使用气流运行,最好是在专用群集上托管Workers OR的相同Kubernetes.

Docker Operation从Cloud Composer气流环境开始,最佳做法是什么?

务实的解决方案是❤️

google-cloud-platform google-cloud-composer

4
推荐指数
1
解决办法
1382
查看次数

我可以重新启动 Cloud Composer 环境吗?

Google Cloud现在使用 Composer 几天了,主要是将数据从 MySQL 移动到 BigQuery,它运行良好。

在某个时候,它停止工作:

  • 运行任务运行很长时间然后失败

  • 任务不开始

  • 新达格有评论This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence

我已经使用 Airflow Web UI 清理了所有 DAG 运行和任务实例,但仍然无法工作。

有没有办法重新启动环境而不丢失已完成任务的结果?还有其他方法可以手动运行气流吗?

google-cloud-platform airflow google-cloud-composer

4
推荐指数
1
解决办法
4024
查看次数

如何在 Airflow 中将 Spark 作业属性传递给 DataProcSparkOperator?

我正在尝试使用 Airflow 的DataProcSparkOperator在 Dataproc 上执行 Spark jar 。该 jar 位于 GCS 上,我正在动态创建 Dataproc 集群,然后在新创建的 Dataproc 集群上执行此 jar。

我能够与气流的DataProcSparkOperator使用默认设置执行此,但我不能够配置星火作业属性(例如--master--deploy-mode--driver-memory等)。从气流的文档中没有得到任何帮助。也尝试了很多东西,但没有成功。帮助表示赞赏。

apache-spark google-cloud-dataproc airflow airflow-scheduler google-cloud-composer

4
推荐指数
1
解决办法
2479
查看次数

使用Dataflow与Cloud Composer

我为这个天真的问题道歉,但我想澄清一下Cloud Dataflow或Cloud Composer是否适合这项工作,我不清楚Google文档.

目前,我正在使用Cloud Dataflow读取非标准csv文件 - 执行一些基本处理 - 并将其加载到BigQuery中.

让我举一个非常基本的例子:

# file.csv
type\x01date
house\x0112/27/1982
car\x0111/9/1889
Run Code Online (Sandbox Code Playgroud)

从这个文件中我们检测到模式并创建一个BigQuery表,如下所示:

`table`
type (STRING)
date (DATE)
Run Code Online (Sandbox Code Playgroud)

而且,我们还格式化我们的数据以插入(在python中)到BigQuery:

DATA = [
    ("house", "1982-12-27"),
    ("car", "1889-9-11")
]
Run Code Online (Sandbox Code Playgroud)

这是对正在发生的事情的极大简化,但这就是我们目前使用Cloud Dataflow的方式.

那么我的问题是,Cloud Composer图片中的位置是什么?它可以在上面提供哪些附加功能?换句话说,为什么它会在"云数据流"之上使用?

google-cloud-dataflow airflow apache-beam google-cloud-composer

4
推荐指数
2
解决办法
3470
查看次数

如何控制Airflow DAG的并行性或并发性?

在我的某些Airflow安装中,即使未完全加载调度程序,调度运行的DAG或任务也不会运行。如何增加可以同时运行的DAG或任务的数量?

同样,如果我的安装负载很高,并且我想限制Airflow工作人员拉出排队任务的速度,我该如何调整?

python airflow google-cloud-composer

4
推荐指数
2
解决办法
1832
查看次数

为什么 Cloud Composer 中会自动生成名为“airflow_monitoring”的 DAG?

在 GCP Composer 上创建 Airflow 环境时,会airflow_monitoring自动创建一个名为 DAG 的 DAG ,即使删除它也会返回。

为什么?如何处理?我应该将此文件复制到我的 DAG 文件夹中并辞职以使其成为我代码的一部分吗?我注意到每次上传代码时它都会停止执行此 DAG,因为在它神奇地重新出现之前无法在 DAG 文件夹中找到它。

我已经尝试在 DAG 文件夹中删除它,删除日志,从 UI 中删除它,所有这些同时等等。

google-cloud-platform airflow google-cloud-stackdriver google-cloud-composer

4
推荐指数
1
解决办法
1881
查看次数

为什么在创建 Cloud Composer 环境时会自动创建 2 个 Pub/Sub 主题和订阅

我注意到在创建云 Composer 环境时会自动创建 2 个 Pub/Sub 主题和订阅,那么这里需要 pub/sub 是什么,Composer 的内部架构如何与 Pub/Sub 相关。

我需要这个概念上的澄清,因为我没有找到任何文档解释这一点。

我明白,cloudcomposer 使用 pub/sub 订阅与其 Kubernetes Engine 服务代理进行通信,但我的问题是为什么它默认创建 2 个主题而不是一个,我还注意到,当我从 cloudcomposer 更改 kubernetes 配置时(例如更改kubernetes 集群的节点数)/更新集群值,它再次创建 2 个其他主题和订阅,所以我想了解它的内部工作原理,为什么它在每次更新后创建新主题和订阅,为什么它不使用退出主题/订阅。还有 Composer 和 Kubernetes Engine 服务代理如何通过 pub/sub 进行通信,这些其他 GCP 组件是否都是自动部署的,我想知道整个内部架构。

我还想了解一件事,GKE 集群中用于 Composer 的功能“airflow-redis-0”pod 是什么?它仅用于消息队列还是充当调度程序和工作人员之间的通信?有什么方法可以在这里检查/可视化(通过 redis-cli 命令)Redis pod 的所有功能吗?

提前致谢。

google-cloud-platform google-cloud-pubsub airflow google-cloud-composer

4
推荐指数
1
解决办法
813
查看次数