标签: google-cloud-composer

从 Cloud Function 的文件到达事件触发 Composer DAG 上的任务

我可以通过云函数触发气流任务吗?

基本上我的问题是这样的。我有一些文件到达谷歌云存储。同一 DAG 中的多个文件。文件到达时我需要触发转换作业。我想使用云功能。但我的 DAG 中有很多依赖作业。

任何帮助表示赞赏

airflow google-cloud-functions google-cloud-composer

2
推荐指数
1
解决办法
7270
查看次数

Pod 启动失败:Pod 启动时间太长,无法运行 KubernetesPodOperator 密钥

我正在使用以下链接运行 KubernetesPodOperator 密钥的快速入门:https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator

下面使用的代码:


from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure. …
Run Code Online (Sandbox Code Playgroud)

google-kubernetes-engine airflow airflow-scheduler google-cloud-composer

2
推荐指数
1
解决办法
2万
查看次数

从应在 Airflow 中按顺序运行的函数返回任务列表

我想从一个函数返回 2 个或更多任务,这些任务应该在它们插入依赖项的位置按顺序运行,请参见下文。

t1 = PythonOperator()

def generate_tasks():
    t2 = PythonOperator()
    t3 = PythonOperator()
    return magic(t2, t3) # magic needed here (preferably)

t1 >> generate_tasks() # otherwise here
# desired result: t1 >> t2 >> t3
Run Code Online (Sandbox Code Playgroud)

这可行吗?据我了解,Airflow 2.0 似乎通过 TaskGroup 实现了这一点,但我们使用的是 Google 的 Composer,2.0 暂时不会可用。

我发现的最佳解决方法:

t1 = PythonOperator()

def generate_tasks():
    t2 = PythonOperator()
    t3 = PythonOperator()
    return [t2, t3]

tasks = generate_tasks()
t1 >> tasks[0] >> tasks[1]
Run Code Online (Sandbox Code Playgroud)

但我真的希望将其抽象化,因为它或多或少违背了从单个函数返回多个运算符的目的。我们希望它是最终用户所知的单个单元,即使它可以由 2 个或更多任务组成。

如何使用 Airflow 2.0 中的 TaskGroup 来完成此操作:

class Encryptor:
    def …
Run Code Online (Sandbox Code Playgroud)

airflow google-cloud-composer

2
推荐指数
1
解决办法
3959
查看次数

在 Google Cloud Composer 中运行气流 cli

我尝试在 Google Cloud Composer 环境中运行气流测试 cli,但它不起作用。

基本上,我想airflow test在气流环境中运行以测试任务。我正在按照这里的说明操作:https : //cloud.google.com/composer/docs/how-to/accessing/airflow-cli

这是我运行的命令:

gcloud beta composer environments run ENVIRONMENT_NAME test MY_DAG FIRST_TASK 2018-05-05
Run Code Online (Sandbox Code Playgroud)

输出:

ERROR: (gcloud.beta.composer.environments.run) unrecognized arguments:

google-cloud-platform google-cloud-composer

1
推荐指数
1
解决办法
2365
查看次数

如何清除气流/合成器中使用CLI的dag运行状态?

我以为可以使用以下命令:

g beta composer environments run <env> --location=us-central1 clear -- <dag_id> -s 2018-05-13 -e 2018-05-14
Run Code Online (Sandbox Code Playgroud)

dag的状态明确运行于2018-05-13。由于某种原因,它不起作用。发生的情况是CLI挂在一条消息上,例如:

kubeconfig entry generated for <kube node name>.
Run Code Online (Sandbox Code Playgroud)

上面命令的预期行为是什么?我希望它能清除间隔内的dag运行,但我可能做错了。

airflow airflow-scheduler google-cloud-composer

1
推荐指数
1
解决办法
3381
查看次数

使用 apache-airflow (cloud-composer) 调度从 BigQuery 到 MongoDB 的负载

我正在尝试设置将数据从 GCS 移动到 BigQuery 的数据管道,执行某些任务/处理并将其加载到 MongoDB 集群(所有这些都使用 DAG 在 python 中设置)。在加载到 MongoDB 之前,我一直能够实现这一目标。是否有任何现有的气流操作员可以做到这一点?如果没有,是否可以使用气流中提供的 mongoDB 钩子创建自定义代码?

谢谢,GT

编辑 1

我使用了(下面的代码片段)中的MongoHook源代码BigQueryGetDataOperator。我现在的问题是我需要为 10++ 百万行做这项工作,当我增加max_results='100'默认值时,BigQueryGetDataOperator我收到一个错误:

sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")

我知道我应该将数据推送到 XCom 中,chunks但不确定是否真的可以做到。有没有标准的方法来解析 Xcom 中的大量数据?使用 Airflow 实现这一目标的任何其他替代方案也将有所帮助。我唯一能想到的就是将数据写入 GCS,加载到 MongoDB …

python mongodb google-bigquery airflow google-cloud-composer

1
推荐指数
1
解决办法
1783
查看次数

未找到所需的 GKE pod,谷歌云作曲家

我正在使用谷歌云作曲家,并创建了作曲家环境。作曲家环境准备好了(有绿色勾号),现在我正在尝试使用谷歌云外壳设置 DAG python 代码中使用的变量。

设置变量的命令:

     gcloud composer environments run test-environment \
       --location us-central1 variables -- \
       --set gcp_project xxx-gcp
Run Code Online (Sandbox Code Playgroud)

确切的错误信息:

  ERROR: (gcloud.composer.environments.run) Desired GKE pod not found. If the environment was recently started, please wait and retry.
Run Code Online (Sandbox Code Playgroud)

我尝试将事情作为调查的一部分,但每次都遇到相同的错误。我使用 UI 而不是 google shell 命令创建了一个新环境。我检查了 kubernetes 引擎中的 pod,都是绿色的,没有发现任何问题。我验证了 Composer API、Billing kubernetes,所有必需的 API 都已启用。

我分配了“编辑”角色。

添加了我第一次看到一些失败的截图

在此处输入图片说明

在此处输入图片说明

错误退出代码 1 谷歌故障排除指南描述:如果退出代码为 1,则容器因应用程序崩溃而崩溃。

google-cloud-platform kubernetes google-kubernetes-engine airflow google-cloud-composer

1
推荐指数
1
解决办法
1923
查看次数

检查blob是否存在

我正在尝试构建一个 DAG,它首先检查 Google Cloud Storage 中是否存在给定的路径/blob。blob 是包含一些 RAW 数据的那个,而不是安装在 Composer 工作器上的那个。

或者,一次性查看它是否存在并包含文件(列表> 1)会很方便,但存在已经是一件好事。

到目前为止,我尝试通过 bash 命令、google.cloud.storage 库和 gcs_hook 使用 gsutil stats 都无济于事。所有这些都为一个文件夹返回 False 我很确定存在

def check_folder(templates_dict,**kwargs):
    bucket = 'bucketname'
    blob_name = templates_dict['blob_name']
    # Blob name is something along the lines of '2019-04-10/11/' 
    gcs = GoogleCloudStorageHook()
    flag = gcs.exists(bucket,blob_name)
    if flag:
        print(flag)
        return('this_is_true')
    else:
        print(flag)
        return('this_is_not_true')
Run Code Online (Sandbox Code Playgroud)

对于给定的 blob_name,我很确定存在,我期待一个 true,但它总是返回 False。知道发生了什么吗?谢谢!

google-cloud-storage airflow google-cloud-composer

1
推荐指数
1
解决办法
1286
查看次数

没有名为 providers.google.cloud.operators.pubsub 的模块:Google Cloud Composer

我正在尝试设置一个 DAG 来响应 Cloud Pub/Sub 消息。我需要在 DAG 代码中添加以下导入语句:

from airflow.providers.google.cloud.operators.pubsub import (
PubSubCreateSubscriptionOperator, PubSubCreateTopicOperator, PubSubDeleteSubscriptionOperator,
PubSubDeleteTopicOperator, PubSubPublishMessageOperator,
)
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
Run Code Online (Sandbox Code Playgroud)

DAG 导入失败,因为它无法解析依赖项。谁能告诉我所需的依赖项以及如何将它们引入 Cloud Composer 环境?

google-cloud-platform google-cloud-pubsub airflow google-cloud-composer

1
推荐指数
1
解决办法
1745
查看次数

Google Cloud Composer 上的 Airflow 与 Docker

我找不到很多关于在 Google Cloud Composer 和 Docker 上运行 Airflow 的差异的信息。我正在尝试将我们目前在 Google Cloud Composer 上的数据管道切换到 Docker 以仅在本地运行,但我正在尝试概念化区别是什么。

local docker airflow data-pipeline google-cloud-composer

1
推荐指数
1
解决办法
1867
查看次数