I\xe2\x80\x99m 尝试执行下面的 dag。\n创建 dataproc 集群的操作员似乎无法启用可选组件来启用 jupyter 笔记本和 anaconda。\n我在这里找到了此代码:Component Gateway with DataprocOperator on Airflow尝试解决它,但对我来说它没有解决它,因为我认为这里的作曲家(气流)版本是不同的。我的版本是composer - 2.0.0-preview.5,airflow-2.1.4。
\n该操作员在创建集群时工作得很好,但它没有使用可选组件来创建以启用 Jupyter Notebook。\n有人有任何想法可以帮助我吗?
\nfrom airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor\nfrom airflow import DAG\nfrom datetime import datetime, timedelta\nfrom airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\n\nyesterday = datetime.combine(datetime.today() - timedelta(1),\n datetime.min.time())\n\n\ndefault_args = {\n \'owner\': \'teste3\',\n \'depends_on_past\': False,\n \'start_date\' :yesterday,\n \'email\': [\'airflow@example.com\'],\n \'email_on_failure\': False,\n \'email_on_retry\': False,\n \'retries\': 0,\n \'retry_delay\': timedelta(minutes=5),\n\n}\n\ndag = DAG(\n \'teste-dag-3\',catchup=False, default_args=default_args, schedule_interval=None)\n\n\n# configura os componentes\nclass CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):\n\n def __init__(self, *args, **kwargs):\n super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)\n\n def _build_cluster_data(self):\n …Run Code Online (Sandbox Code Playgroud) google-cloud-platform google-cloud-dataproc airflow google-cloud-composer
在我的Composer Airflow DAG 中,我一直在使用 CloudSqlProxyRunner连接到我的 Cloud SQL 实例。
然而,在将 Google Cloud Composer 从 v1.18.4 更新到 1.18.6 后,我的 DAG 开始遇到一个奇怪的错误:
[2022-04-22, 23:20:18 UTC] {cloud_sql.py:462} INFO - Downloading cloud_sql_proxy from https://dl.google.com/cloudsql/cloud_sql_proxy.linux.x86_64 to /home/airflow/dXhOYoU_cloud_sql_proxy.tmp
[2022-04-22, 23:20:18 UTC] {taskinstance.py:1702} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1457, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1513, in _execute_task
result = execute_callable(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/decorators/base.py", line 134, in …Run Code Online (Sandbox Code Playgroud) google-cloud-sql airflow cloud-sql-proxy google-cloud-composer
我想通过Cloud Composer 接收电子邮件通知,但我不确定如何做到这一点。如何配置 Composer 环境以发送电子邮件通知?
至于文档,Google Cloud Composer气流工作者节点由专用的kubernetes集群提供:
我有一个Docker包含ETL步骤,我想使用气流运行,最好是在专用群集上托管Workers OR的相同Kubernetes.
Docker Operation从Cloud Composer气流环境开始,最佳做法是什么?
务实的解决方案是❤️
我Google Cloud现在使用 Composer 几天了,主要是将数据从 MySQL 移动到 BigQuery,它运行良好。
在某个时候,它停止工作:
运行任务运行很长时间然后失败
任务不开始
新达格有评论This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence
我已经使用 Airflow Web UI 清理了所有 DAG 运行和任务实例,但仍然无法工作。
有没有办法重新启动环境而不丢失已完成任务的结果?还有其他方法可以手动运行气流吗?
我正在尝试使用 Airflow 的DataProcSparkOperator在 Dataproc 上执行 Spark jar 。该 jar 位于 GCS 上,我正在动态创建 Dataproc 集群,然后在新创建的 Dataproc 集群上执行此 jar。
我能够与气流的DataProcSparkOperator使用默认设置执行此,但我不能够配置星火作业属性(例如--master,--deploy-mode,--driver-memory等)。从气流的文档中没有得到任何帮助。也尝试了很多东西,但没有成功。帮助表示赞赏。
apache-spark google-cloud-dataproc airflow airflow-scheduler google-cloud-composer
我为这个天真的问题道歉,但我想澄清一下Cloud Dataflow或Cloud Composer是否适合这项工作,我不清楚Google文档.
目前,我正在使用Cloud Dataflow读取非标准csv文件 - 执行一些基本处理 - 并将其加载到BigQuery中.
让我举一个非常基本的例子:
# file.csv
type\x01date
house\x0112/27/1982
car\x0111/9/1889
Run Code Online (Sandbox Code Playgroud)
从这个文件中我们检测到模式并创建一个BigQuery表,如下所示:
`table`
type (STRING)
date (DATE)
Run Code Online (Sandbox Code Playgroud)
而且,我们还格式化我们的数据以插入(在python中)到BigQuery:
DATA = [
("house", "1982-12-27"),
("car", "1889-9-11")
]
Run Code Online (Sandbox Code Playgroud)
这是对正在发生的事情的极大简化,但这就是我们目前使用Cloud Dataflow的方式.
那么我的问题是,Cloud Composer图片中的位置是什么?它可以在上面提供哪些附加功能?换句话说,为什么它会在"云数据流"之上使用?
google-cloud-dataflow airflow apache-beam google-cloud-composer
在我的某些Airflow安装中,即使未完全加载调度程序,调度运行的DAG或任务也不会运行。如何增加可以同时运行的DAG或任务的数量?
同样,如果我的安装负载很高,并且我想限制Airflow工作人员拉出排队任务的速度,我该如何调整?
在 GCP Composer 上创建 Airflow 环境时,会airflow_monitoring自动创建一个名为 DAG 的 DAG ,即使删除它也会返回。
为什么?如何处理?我应该将此文件复制到我的 DAG 文件夹中并辞职以使其成为我代码的一部分吗?我注意到每次上传代码时它都会停止执行此 DAG,因为在它神奇地重新出现之前无法在 DAG 文件夹中找到它。
我已经尝试在 DAG 文件夹中删除它,删除日志,从 UI 中删除它,所有这些同时等等。
google-cloud-platform airflow google-cloud-stackdriver google-cloud-composer
我注意到在创建云 Composer 环境时会自动创建 2 个 Pub/Sub 主题和订阅,那么这里需要 pub/sub 是什么,Composer 的内部架构如何与 Pub/Sub 相关。
我需要这个概念上的澄清,因为我没有找到任何文档解释这一点。
我明白,cloudcomposer 使用 pub/sub 订阅与其 Kubernetes Engine 服务代理进行通信,但我的问题是为什么它默认创建 2 个主题而不是一个,我还注意到,当我从 cloudcomposer 更改 kubernetes 配置时(例如更改kubernetes 集群的节点数)/更新集群值,它再次创建 2 个其他主题和订阅,所以我想了解它的内部工作原理,为什么它在每次更新后创建新主题和订阅,为什么它不使用退出主题/订阅。还有 Composer 和 Kubernetes Engine 服务代理如何通过 pub/sub 进行通信,这些其他 GCP 组件是否都是自动部署的,我想知道整个内部架构。
我还想了解一件事,GKE 集群中用于 Composer 的功能“airflow-redis-0”pod 是什么?它仅用于消息队列还是充当调度程序和工作人员之间的通信?有什么方法可以在这里检查/可视化(通过 redis-cli 命令)Redis pod 的所有功能吗?
提前致谢。
google-cloud-platform google-cloud-pubsub airflow google-cloud-composer