标签: google-cloud-composer

GCP Dataproc 集群上的工作流调度

我有一些复杂的 Oozie 工作流程需要从本地 Hadoop 迁移到 GCP Dataproc。工作流程由 shell 脚本、Python 脚本、Spark-Scala 作业、Sqoop 作业等组成。

我遇到了一些包含我的工作流程调度需求的潜在解决方案:

  1. 云作曲家
  2. 具有云计划功能的 Dataproc 工作流模板
  3. 在 Dataproc 自动扩展集群上安装 Oozie

请让我知道哪种选项在性能、成本和迁移复杂性方面最有效。

hadoop google-cloud-dataproc airflow oozie-workflow google-cloud-composer

4
推荐指数
1
解决办法
1562
查看次数

无法从云存储发生更改时触发的云功能触发composer/airflow dag

google-cloud-composer我已经在环境(dlkpipelinesv1 :composer-1.13.0-airflow-1.10.12)上创建并运行了 dags 。我可以手动触发这些 dag,并使用调度程序,但是当我通过cloud-functions检测存储桶中的更改来触发它们时,我陷入了困境google-cloud-storage

请注意,我有另一个 GC-Composer 环境(管道:composer-1.7.5-airflow-1.10.2),它使用相同的谷歌云功能来触发相关的 dags,并且它正在工作

我按照本指南创建了触发 dags 的函数。所以我检索了以下变量:

PROJECT_ID = <project_id>
CLIENT_ID = <client_id_retrieved_by_running_the_code_in_the_guide_within_my_gcp_console>
WEBSERVER_ID = <airflow_webserver_id>
DAG_NAME = <dag_to_trigger>
WEBSERVER_URL = f"https://{WEBSERVER_ID}.appspot.com/api/experimental/dags/{DAG_NAME}/dag_runs"


def file_listener(event, context):
    """Entry point of the cloud function: Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    logging.info("Running the file listener process")
    logging.info(f"event : {event}")
    logging.info(f"context : {context}") …
Run Code Online (Sandbox Code Playgroud)

python oauth google-oauth google-cloud-platform google-cloud-composer

4
推荐指数
1
解决办法
5893
查看次数

如何使用 Google Cloud Composer 设置 dbt?

我正在尝试在 Google Cloud Composer 上安装 dbt,但遇到了依赖性问题。我已按照本文中的说明进行操作: https: //blog.doit-intl.com/setup-dbt-with-cloud-composer-ab702454e27b但是在步骤 2:在 Composer 中安装软件包(airflow-dbt 和 dbt),它已经失败了。

我在云构建日志中找到以下内容:

ERROR: snowflake-connector-python 2.3.6 has requirement boto3<1.16,>=1.4.4, but you'll have boto3 1.17.85 which is incompatible.
ERROR: snowflake-connector-python 2.3.6 has requirement requests<2.24.0, but you'll have requests 2.24.0 which is incompatible.
ERROR: networkx 2.5.1 has requirement decorator<5,>=4.3, but you'll have decorator 5.0.9 which is incompatible.
ERROR: hologram 0.0.13 has requirement jsonschema<3.2,>=3.0, but you'll have jsonschema 3.2.0 which is incompatible.
ERROR: dbt-core 0.19.1 has requirement idna<2.10, but you'll have …
Run Code Online (Sandbox Code Playgroud)

google-cloud-platform google-cloud-composer dbt

4
推荐指数
1
解决办法
2809
查看次数

GCP Cloud Composer 和工作流程有什么区别?

云工作流程不具备调度功能。除此之外,这两种服务在功能方面有何区别?在哪种用例中,我们应该更喜欢工作流程而不是作曲家,反之亦然?

workflow google-cloud-platform airflow google-cloud-composer

4
推荐指数
1
解决办法
5301
查看次数

PythonVirtualenvOperator 因 TypeError 失败:无法 pickle 'module' 对象

Cloud Composer 2.1.xx 上会发生以下情况

我正在尝试将 PythonVirtualenvOperator 与模板化参数一起使用。不幸的是,操作员失败并出现以下错误:

TypeError: cannot pickle 'module' object
Run Code Online (Sandbox Code Playgroud)

这是我的 dag 的代码:

with models.DAG(
    'name',
    schedule_interval=None,
    start_date=datetime.datetime(2018, 1, 1),
    catchup=False) as dag:

t1 = PythonVirtualenvOperator(
    task_id='download',
    python_callable=update_files,
    requirements=['google-cloud-firestore==2.2.0'],
    python_version='3.8',
    op_kwargs={
        "inputPaths": '{{ dag_run.conf["inputPaths"] }}'
    }
)
Run Code Online (Sandbox Code Playgroud)

python 函数的代码如下所示:

def update_files(**kwargs):
    from google.cloud import firestore
    import datetime
    paths = kwargs['inputPaths']
    .....
Run Code Online (Sandbox Code Playgroud)

我尝试使用参数use_dill = True和建议的答案如何在气流中使用PythonVirtualenvOperator?但这并没有让任何事情变得更好。

google-cloud-platform google-cloud-composer

4
推荐指数
1
解决办法
1791
查看次数

如何将 Cloud Run 容器执行到 Airflow DAG 中?

我正在尝试使用 Cloud Run 来运行容器作为 Airflow 的 DAG 的任务。

似乎没有像 CloudRunOperator 或类似的东西,我在文档中找不到任何内容(Cloud Run 和 Airflow 都可以)。

有人处理过这个问题吗?如果是,我如何使用 Cloud Run 运行容器并处理 xcom?

提前致谢!!

google-cloud-platform airflow google-cloud-composer google-cloud-run

4
推荐指数
1
解决办法
5378
查看次数

在google composer中删除DAG - Airflow UI

我想从Airflow UI中删除DAG,该GCS/dags文件夹中不再提供.我知道Airflow有一种"新"方法可以使用airflow delete_dag my_dag_id命令从数据库中删除dag ,见/sf/answers/3477848041/

似乎在composer airflow版本中,该delete_dag命令尚不支持.

不要试试这个: 我也试过使用airflow resetdb,气流用户界面就死了

有没有办法删除当前不在gs://BUCKET/dags/文件夹中的dags ?

airflow google-cloud-composer

3
推荐指数
1
解决办法
1413
查看次数

Google Cloud Composer 变量不会传播到 Airflow

我正在尝试运行Google Cloud Composer 文档中的示例,但我发现了问题,主要有两个:

  • 通过 gcloud 命令行或 Web 界面创建的环境变量不会传播到 Airflow 层,从而导致 DAG 无法抱怨“变量 gcs_bucket 不存在”。如果我从 Airflow UI 添加变量,那么它就可以工作。

  • DAG 正确执行,但在任何阶段我都看不到在 Airflow 中看到的一组链接(树、图、...)。即使在执行成功完成之后。

我已经检查了服务帐户(在环境中默认创建)是否有权编辑和更新变量(编辑者角色)也可以通过 API 访问。

我发现文档不是最新的,例如创建我需要使用的变量:

gcloud composer environments update test-environment \ 
--location=us-central1 \
--update-env-variables=gcs_bucket=gs://airflow2
Run Code Online (Sandbox Code Playgroud)

而不是文章所说的。

我还要检查什么?自文档编写以来还有哪些变化?

编辑:这里描述一个相关的问题,它似乎解决了第二个问题。

编辑:有趣的是,如果我运行这个免费实验室,第二个问题就不会发生。第一个还在。我正在比较一个和另一个中的角色,从与自动创建的 Composer 项目无关的角色开始。并且没有以替代解决方案的格式创建的服务帐户。

我将感谢任何人可以提供的任何帮助。

google-cloud-platform airflow google-cloud-composer

3
推荐指数
1
解决办法
1790
查看次数

如何传递动态参数 Airflow 运算符?

我正在使用 Airflow 在 Google Cloud Composer 上运行 Spark 作业。我需要

  • 创建集群(用户提供的 YAML 参数)
  • spark 作业列表(作业参数也由每个作业 YAML 提供)

使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。

但是,考虑 DataprocClusterCreateOperator()

  • cluster_name
  • project_id
  • zone

和其他一些参数被标记为模板化。

如果我想将其他参数作为模板传递(目前不是这样)怎么办?-样image_versionnum_workersworker_machine_type等?

有什么解决方法吗?

google-cloud-platform airflow google-cloud-composer apache-airflow-xcom

3
推荐指数
1
解决办法
2762
查看次数

如何通过 yes 标志回填 Google Composer?

我正在尝试使用gcloud composer命令在 Google Composer 中进行回填,但我正在努力传递 a -yor--yes相应的--reset_dagruns参数。

我得到的回应是airflow: error: unrecognized arguments: -y

命令:

gcloud composer environments run my_env --project my_project --location us-east1 backfill -- --reset_dagruns -y -s 2020-01-01 -e 2020-01-31 my_dag
Run Code Online (Sandbox Code Playgroud)

我如何提供这个论点?

google-cloud-platform airflow google-cloud-composer

3
推荐指数
1
解决办法
668
查看次数