我可以通过云函数触发气流任务吗?
基本上我的问题是这样的。我有一些文件到达谷歌云存储。同一 DAG 中的多个文件。文件到达时我需要触发转换作业。我想使用云功能。但我的 DAG 中有很多依赖作业。
任何帮助表示赞赏
我正在使用以下链接运行 KubernetesPodOperator 密钥的快速入门:https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator
下面使用的代码:
from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator
# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure. …google-kubernetes-engine airflow airflow-scheduler google-cloud-composer
我想从一个函数返回 2 个或更多任务,这些任务应该在它们插入依赖项的位置按顺序运行,请参见下文。
t1 = PythonOperator()
def generate_tasks():
    t2 = PythonOperator()
    t3 = PythonOperator()
    return magic(t2, t3) # magic needed here (preferably)
t1 >> generate_tasks() # otherwise here
# desired result: t1 >> t2 >> t3
这可行吗?据我了解,Airflow 2.0 似乎通过 TaskGroup 实现了这一点,但我们使用的是 Google 的 Composer,2.0 暂时不会可用。
我发现的最佳解决方法:
t1 = PythonOperator()
def generate_tasks():
    t2 = PythonOperator()
    t3 = PythonOperator()
    return [t2, t3]
tasks = generate_tasks()
t1 >> tasks[0] >> tasks[1]
但我真的希望将其抽象化,因为它或多或少违背了从单个函数返回多个运算符的目的。我们希望它是最终用户所知的单个单元,即使它可以由 2 个或更多任务组成。
如何使用 Airflow 2.0 中的 TaskGroup 来完成此操作:
class Encryptor:
    def …我尝试在 Google Cloud Composer 环境中运行气流测试 cli,但它不起作用。
基本上,我想airflow test在气流环境中运行以测试任务。我正在按照这里的说明操作:https : //cloud.google.com/composer/docs/how-to/accessing/airflow-cli
这是我运行的命令:
gcloud beta composer environments run ENVIRONMENT_NAME test MY_DAG FIRST_TASK 2018-05-05
输出:
ERROR: (gcloud.beta.composer.environments.run) unrecognized arguments:
我以为可以使用以下命令:
g beta composer environments run <env> --location=us-central1 clear -- <dag_id> -s 2018-05-13 -e 2018-05-14
dag的状态明确运行于2018-05-13。由于某种原因,它不起作用。发生的情况是CLI挂在一条消息上,例如:
kubeconfig entry generated for <kube node name>.
上面命令的预期行为是什么?我希望它能清除间隔内的dag运行,但我可能做错了。
我正在尝试设置将数据从 GCS 移动到 BigQuery 的数据管道,执行某些任务/处理并将其加载到 MongoDB 集群(所有这些都使用 DAG 在 python 中设置)。在加载到 MongoDB 之前,我一直能够实现这一目标。是否有任何现有的气流操作员可以做到这一点?如果没有,是否可以使用气流中提供的 mongoDB 钩子创建自定义代码?
谢谢,GT
编辑 1
我使用了(下面的代码片段)中的MongoHook源代码BigQueryGetDataOperator。我现在的问题是我需要为 10++ 百万行做这项工作,当我增加max_results='100'默认值时,BigQueryGetDataOperator我收到一个错误:
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")
我知道我应该将数据推送到 XCom 中,chunks但不确定是否真的可以做到。有没有标准的方法来解析 Xcom 中的大量数据?使用 Airflow 实现这一目标的任何其他替代方案也将有所帮助。我唯一能想到的就是将数据写入 GCS,加载到 MongoDB …
python mongodb google-bigquery airflow google-cloud-composer
我正在使用谷歌云作曲家,并创建了作曲家环境。作曲家环境准备好了(有绿色勾号),现在我正在尝试使用谷歌云外壳设置 DAG python 代码中使用的变量。
设置变量的命令:
     gcloud composer environments run test-environment \
       --location us-central1 variables -- \
       --set gcp_project xxx-gcp
确切的错误信息:
  ERROR: (gcloud.composer.environments.run) Desired GKE pod not found. If the environment was recently started, please wait and retry.
我尝试将事情作为调查的一部分,但每次都遇到相同的错误。我使用 UI 而不是 google shell 命令创建了一个新环境。我检查了 kubernetes 引擎中的 pod,都是绿色的,没有发现任何问题。我验证了 Composer API、Billing kubernetes,所有必需的 API 都已启用。
我分配了“编辑”角色。
添加了我第一次看到一些失败的截图
错误退出代码 1 谷歌故障排除指南描述:如果退出代码为 1,则容器因应用程序崩溃而崩溃。
google-cloud-platform kubernetes google-kubernetes-engine airflow google-cloud-composer
我正在尝试构建一个 DAG,它首先检查 Google Cloud Storage 中是否存在给定的路径/blob。blob 是包含一些 RAW 数据的那个,而不是安装在 Composer 工作器上的那个。
或者,一次性查看它是否存在并包含文件(列表> 1)会很方便,但存在已经是一件好事。
到目前为止,我尝试通过 bash 命令、google.cloud.storage 库和 gcs_hook 使用 gsutil stats 都无济于事。所有这些都为一个文件夹返回 False 我很确定存在
def check_folder(templates_dict,**kwargs):
    bucket = 'bucketname'
    blob_name = templates_dict['blob_name']
    # Blob name is something along the lines of '2019-04-10/11/' 
    gcs = GoogleCloudStorageHook()
    flag = gcs.exists(bucket,blob_name)
    if flag:
        print(flag)
        return('this_is_true')
    else:
        print(flag)
        return('this_is_not_true')
对于给定的 blob_name,我很确定存在,我期待一个 true,但它总是返回 False。知道发生了什么吗?谢谢!
我正在尝试设置一个 DAG 来响应 Cloud Pub/Sub 消息。我需要在 DAG 代码中添加以下导入语句:
from airflow.providers.google.cloud.operators.pubsub import (
PubSubCreateSubscriptionOperator, PubSubCreateTopicOperator, PubSubDeleteSubscriptionOperator,
PubSubDeleteTopicOperator, PubSubPublishMessageOperator,
)
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
DAG 导入失败,因为它无法解析依赖项。谁能告诉我所需的依赖项以及如何将它们引入 Cloud Composer 环境?
google-cloud-platform google-cloud-pubsub airflow google-cloud-composer
我找不到很多关于在 Google Cloud Composer 和 Docker 上运行 Airflow 的差异的信息。我正在尝试将我们目前在 Google Cloud Composer 上的数据管道切换到 Docker 以仅在本地运行,但我正在尝试概念化区别是什么。