我们最近遇到了一个关于气流的已知问题:
现在我们使用临时解决方案通过更改配置来重新启动整个环境,但这不是一种有效的方法.现在我们认为最好的解决方法是重新启动云编写器上的Web服务器,但我们没有找到任何重启webserver的命令.这可能是一个动作吗?
谢谢!
当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?
我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?
python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator
目前正在致力于为 Airflow 中长时间运行的任务设置警报。为了取消/使气流 dag 失败,我在 default_args 中放入了“dagrun_timeout”,它执行了我需要的操作,当 dag 运行时间过长(通常被卡住)时,它会失败/出错。唯一的问题是,当超过 dagrun_timeout 时,“on_failure_callback”中的函数不会被调用,因为“on_failure_callback”位于任务级别(我认为),而 dagrun_timeout 位于 dag 级别。
当超过 dagrun_timeout 时,如何执行“on_failure_callback”,或者如何指定 dag 失败时要调用的函数?或者我应该重新考虑我的方法?
我正在尝试在 VS Code 中设置本地开发环境,在其中我可以获得 Cloud Composer/Apache Airflow 使用的包的代码完成。到目前为止,我已经成功地使用了虚拟环境(使用 创建python -m venv .venv)和一个非常小的requirements.txt文件,该文件仅包含安装到本地环境中的 Airflow 包。
该文件是这样的:
apache-airflow==1.10.15
Run Code Online (Sandbox Code Playgroud)
我可以pip install -r requirements.txt在 VS Code 中激活我的虚拟环境后运行,将其安装到我的虚拟环境中,之后我在 VS Code 中获得其文档中快速入门 DAG 的代码完成,即BashOperator:

随着我学习更多教程,我希望获得更多代码补全。例如,按照KubernetesPodOperator教程(https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator),我收到此错误,并且 VS Code 无法识别导入:
无法解析导入“airflow.providers.cncf.kubernetes.operators.kubernetes_pod”Pylance(reportMissingImports)
我认为下一步最好是将与在 Cloud Composer 环境中运行的完全相同的 PyPI 包安装到我的虚拟环境中。我使用页面https://cloud.google.com/composer/docs/concepts/versioning/composer-versions来查看安装了哪些软件包:
所以我的requirements.txt文件看起来像这样:
absl-py==1.0.0
alembic==1.5.7
amqp==2.6.1
apache-airflow==1.10.15+composer
apache-airflow-backport-providers-apache-beam==2021.3.13
apache-airflow-backport-providers-cncf-kubernetes==2021.3.3
apache-airflow-backport-providers-google==2022.4.1+composer
apache-beam==2.37.0
apispec==1.3.3
appdirs==1.4.4
argcomplete==1.12.2
astunparse==1.6.3
attrs==20.3.0
Babel==2.9.0
bcrypt==3.2.0
billiard==3.6.3.0
cached-property==1.5.2
cachetools==4.2.1
cattrs==1.1.2
celery==4.4.7
certifi==2020.12.5
cffi==1.14.5
chardet==4.0.0
click==6.7 …Run Code Online (Sandbox Code Playgroud) python pip google-cloud-platform airflow google-cloud-composer
我们有哪些方法可以从新推出的Google Cloud Composer连接到Google Cloud SQL(MySQL)实例?目的是将Cloud SQL实例中的数据导入BigQuery(可能通过云存储实现中间步骤).
Cloud SQL代理是否可以在托管上以某种方式暴露给托管Composer的Kubernetes集群?
如果没有,可以使用Kubernetes Service Broker引入Cloud SQL Proxy吗?- > https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker
应该使用Airflow来安排和调用GCP API命令,例如1)将mysql表导出到云存储2)读取mysql导出到bigquery?
也许还有其他方法让我无法完成这项工作
google-cloud-sql google-cloud-platform airflow google-cloud-composer
我正在尝试使用服务帐户密钥在 Google Cloud Composer 中设置 Google Cloud Platform 连接。所以我创建了一个 GCS 存储桶并将服务帐户密钥文件放在存储桶中。密钥存储在 JSON 中。在密钥文件路径字段中,我指定了一个 GCS 存储桶,在密钥文件 JSON 字段中,我指定了文件名。范围是https://www.googleapis.com/auth/cloud-platform。
尝试使用此连接启动 Dataproc 集群时,出现找不到 JSON 文件的错误。
查看错误消息,代码尝试使用以下方法解析文件:
with open(filename, 'r') as file_obj这显然不适用于 GCS 存储桶路径。
所以我的问题是,如果不能将这个服务帐户密钥文件放在 GCS 路径中,我应该把它放在哪里?
经过一些研究和测试,我们决定开始使用 Google Cloud Composer。由于我们当前的 DAG 和任务相对较小,并且不需要服务器连续运行,因此我正在寻找如何管理成本。
两个问题:
preemptible虚拟机的选项似乎合乎逻辑。这大大节省了成本,我正在考虑选择 3x n1-standard-4。我希望每个任务都很短,所以不要认为这会对我们的工作负载产生重大影响。是否可以将preemptibleVM 与 Composer一起使用?帮助,有人吗?
我编写了一个气流插件,它只包含一个自定义运算符(以支持BigQuery中的CMEK).我可以使用单个任务创建一个简单的DAG,该任务使用此运算符并且执行正常.
但是,如果我尝试在DAG中从DummyOperator任务创建依赖关系到我的自定义操作员任务,则DAG无法在UI中加载并抛出以下错误,我无法理解为什么会抛出此错误?
破坏的DAG:[/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py]关系只能在运营商之间设置; 收到BQCMEKOperator
到目前为止,我已经在composer-1.4.2-airflow-1.9.0,composer-1.4.2-airflow-1.10.0和composer-1.4.1-airflow-1.10.0上进行了测试.
每个任务的运行气流测试都可以顺利完成.
在DAG中单独使用它可以正常工作(如下所示)所以我不相信插件本身存在任何错误
import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
default_dag_args = {
'start_date': datetime.datetime(2019,1,1),
'retries': 0
}
dag = DAG(
'js_bq_custom_plugin',
schedule_interval=None,
catchup=False,
concurrency=1,
max_active_runs=1,
default_args=default_dag_args)
run_this = BQCMEKOperator(
task_id = 'cmek_plugin_test',
sql = 'select * from ds.foo LIMIT 15',
project = 'xxx',
dataset = 'js_dev',
table = 'cmek_test10',
key = 'xxx',
dag = dag
)
Run Code Online (Sandbox Code Playgroud)
然而,如果我引入DummyOperator和依赖项,则会发生错误
import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from …Run Code Online (Sandbox Code Playgroud) 我目前正在学习 GCP 数据工程师考试,并且一直在努力了解何时使用 Cloud Scheduler 以及何时使用 Cloud Composer。
通过阅读文档,我的印象是,当作业之间存在相互依赖关系时,应该使用 Cloud Composer,例如,我们需要一个作业的输出在第一个完成时启动另一个作业,并使用来自第一个作业的依赖项。然后,您可以根据需要灵活地链接任意数量的这些“工作流”,并提供在失败时重新启动作业、运行批处理作业、shell 脚本、链查询等的机会。
对于 Cloud Scheduler,它在可以执行哪些任务方面具有非常相似的功能,但是,它更多地用于常规作业,您可以定期执行,并且在作业之间存在相互依赖关系或在开始另一个工作之前,您需要等待其他工作。因此,似乎更适合用于“更简单”的任务。
这些想法是在尝试回答我发现的一些考试问题之后产生的。然而,我对我找到的“正确答案”感到惊讶,并希望有人能澄清这些答案是否正确,以及我是否理解何时使用一个而不是另一个。
以下是有关此主题的让我感到困惑的示例问题:
问题 1
您正在实施几个必须按计划执行的批处理作业。这些作业有许多必须按特定顺序执行的相互依赖的步骤。部分作业涉及执行 shell 脚本、运行 Hadoop 作业以及在 BigQuery 中运行查询。这些作业预计会运行几分钟到几个小时。如果这些步骤失败,则必须重试固定次数。您应该使用哪种服务来管理这些作业的执行?
A. 云调度器
B. 云数据流
C. 云功能
D. 云作曲家
正确答案:A
问题2
您想要自动执行在 GCP 上运行的多步骤数据管道。该管道包括相互具有多个依赖关系的 Cloud Dataproc 和 Cloud Dataflow 作业。您希望尽可能使用托管服务,并且管道将每天运行。您应该使用哪种工具?
A. cron
B. 云作曲家
C. 云调度器
D. Cloud Dataproc 上的工作流模板
正确答案:D
问题 3
您的公司有一个混合云计划。您有一个复杂的数据管道,可以在云提供商服务之间移动数据并利用每个云提供商的服务。您应该使用哪种云原生服务来编排整个管道?
A. 云数据流
B. 云作曲家
C. 云数据准备
D. 云数据过程
正确答案:D
对此的任何见解将不胜感激。谢谢 !
google-cloud-platform google-cloud-composer google-cloud-scheduler
我在 Google Cloud 中使用 Composer (Airflow)。我想创建一个新环境,并将旧环境中相同的 DAG 和变量带到新环境中。
为此,我执行以下操作:
gsutil相同的 DAG 并将其上传到新环境但是,在新环境中,由于FERNET_KEY configuration is missing. 我最好的猜测是,这与导入使用单独的 Fernet 密钥加密的变量有关,但我不确定。
有没有人遇到过这个问题?如果是这样,你是如何解决的?