我有一些复杂的 Oozie 工作流程需要从本地 Hadoop 迁移到 GCP Dataproc。工作流程由 shell 脚本、Python 脚本、Spark-Scala 作业、Sqoop 作业等组成。
我遇到了一些包含我的工作流程调度需求的潜在解决方案:
请让我知道哪种选项在性能、成本和迁移复杂性方面最有效。
hadoop google-cloud-dataproc airflow oozie-workflow google-cloud-composer
google-cloud-composer我已经在环境(dlkpipelinesv1 :composer-1.13.0-airflow-1.10.12)上创建并运行了 dags 。我可以手动触发这些 dag,并使用调度程序,但是当我通过cloud-functions检测存储桶中的更改来触发它们时,我陷入了困境google-cloud-storage。
请注意,我有另一个 GC-Composer 环境(管道:composer-1.7.5-airflow-1.10.2),它使用相同的谷歌云功能来触发相关的 dags,并且它正在工作。
我按照本指南创建了触发 dags 的函数。所以我检索了以下变量:
PROJECT_ID = <project_id>
CLIENT_ID = <client_id_retrieved_by_running_the_code_in_the_guide_within_my_gcp_console>
WEBSERVER_ID = <airflow_webserver_id>
DAG_NAME = <dag_to_trigger>
WEBSERVER_URL = f"https://{WEBSERVER_ID}.appspot.com/api/experimental/dags/{DAG_NAME}/dag_runs"
def file_listener(event, context):
"""Entry point of the cloud function: Triggered by a change to a Cloud Storage bucket.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
logging.info("Running the file listener process")
logging.info(f"event : {event}")
logging.info(f"context : {context}") …Run Code Online (Sandbox Code Playgroud) python oauth google-oauth google-cloud-platform google-cloud-composer
我正在尝试在 Google Cloud Composer 上安装 dbt,但遇到了依赖性问题。我已按照本文中的说明进行操作: https: //blog.doit-intl.com/setup-dbt-with-cloud-composer-ab702454e27b但是在步骤 2:在 Composer 中安装软件包(airflow-dbt 和 dbt),它已经失败了。
我在云构建日志中找到以下内容:
ERROR: snowflake-connector-python 2.3.6 has requirement boto3<1.16,>=1.4.4, but you'll have boto3 1.17.85 which is incompatible.
ERROR: snowflake-connector-python 2.3.6 has requirement requests<2.24.0, but you'll have requests 2.24.0 which is incompatible.
ERROR: networkx 2.5.1 has requirement decorator<5,>=4.3, but you'll have decorator 5.0.9 which is incompatible.
ERROR: hologram 0.0.13 has requirement jsonschema<3.2,>=3.0, but you'll have jsonschema 3.2.0 which is incompatible.
ERROR: dbt-core 0.19.1 has requirement idna<2.10, but you'll have …Run Code Online (Sandbox Code Playgroud) 云工作流程不具备调度功能。除此之外,这两种服务在功能方面有何区别?在哪种用例中,我们应该更喜欢工作流程而不是作曲家,反之亦然?
workflow google-cloud-platform airflow google-cloud-composer
Cloud Composer 2.1.xx 上会发生以下情况
我正在尝试将 PythonVirtualenvOperator 与模板化参数一起使用。不幸的是,操作员失败并出现以下错误:
TypeError: cannot pickle 'module' object
Run Code Online (Sandbox Code Playgroud)
这是我的 dag 的代码:
with models.DAG(
'name',
schedule_interval=None,
start_date=datetime.datetime(2018, 1, 1),
catchup=False) as dag:
t1 = PythonVirtualenvOperator(
task_id='download',
python_callable=update_files,
requirements=['google-cloud-firestore==2.2.0'],
python_version='3.8',
op_kwargs={
"inputPaths": '{{ dag_run.conf["inputPaths"] }}'
}
)
Run Code Online (Sandbox Code Playgroud)
python 函数的代码如下所示:
def update_files(**kwargs):
from google.cloud import firestore
import datetime
paths = kwargs['inputPaths']
.....
Run Code Online (Sandbox Code Playgroud)
我尝试使用参数use_dill = True和建议的答案如何在气流中使用PythonVirtualenvOperator?但这并没有让任何事情变得更好。
我正在尝试使用 Cloud Run 来运行容器作为 Airflow 的 DAG 的任务。
似乎没有像 CloudRunOperator 或类似的东西,我在文档中找不到任何内容(Cloud Run 和 Airflow 都可以)。
有人处理过这个问题吗?如果是,我如何使用 Cloud Run 运行容器并处理 xcom?
提前致谢!!
google-cloud-platform airflow google-cloud-composer google-cloud-run
我想从Airflow UI中删除DAG,该GCS/dags文件夹中不再提供.我知道Airflow有一种"新"方法可以使用airflow delete_dag my_dag_id命令从数据库中删除dag
,见/sf/answers/3477848041/
似乎在composer airflow版本中,该delete_dag命令尚不支持.
不要试试这个: 我也试过使用airflow resetdb,气流用户界面就死了
有没有办法删除当前不在gs://BUCKET/dags/文件夹中的dags ?
我正在尝试运行Google Cloud Composer 文档中的示例,但我发现了问题,主要有两个:
通过 gcloud 命令行或 Web 界面创建的环境变量不会传播到 Airflow 层,从而导致 DAG 无法抱怨“变量 gcs_bucket 不存在”。如果我从 Airflow UI 添加变量,那么它就可以工作。
DAG 正确执行,但在任何阶段我都看不到在 Airflow 中看到的一组链接(树、图、...)。即使在执行成功完成之后。
我已经检查了服务帐户(在环境中默认创建)是否有权编辑和更新变量(编辑者角色)也可以通过 API 访问。
我发现文档不是最新的,例如创建我需要使用的变量:
gcloud composer environments update test-environment \
--location=us-central1 \
--update-env-variables=gcs_bucket=gs://airflow2
Run Code Online (Sandbox Code Playgroud)
而不是文章所说的。
我还要检查什么?自文档编写以来还有哪些变化?
编辑:有趣的是,如果我运行这个免费实验室,第二个问题就不会发生。第一个还在。我正在比较一个和另一个中的角色,从与自动创建的 Composer 项目无关的角色开始。并且没有以替代解决方案的格式创建的服务帐户。
我将感谢任何人可以提供的任何帮助。
我正在使用 Airflow 在 Google Cloud Composer 上运行 Spark 作业。我需要
使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。
但是,考虑 DataprocClusterCreateOperator()
cluster_nameproject_idzone和其他一些参数被标记为模板化。
如果我想将其他参数作为模板传递(目前不是这样)怎么办?-样image_version,
num_workers,worker_machine_type等?
有什么解决方法吗?
google-cloud-platform airflow google-cloud-composer apache-airflow-xcom
我正在尝试使用gcloud composer命令在 Google Composer 中进行回填,但我正在努力传递 a -yor--yes相应的--reset_dagruns参数。
我得到的回应是airflow: error: unrecognized arguments: -y。
命令:
gcloud composer environments run my_env --project my_project --location us-east1 backfill -- --reset_dagruns -y -s 2020-01-01 -e 2020-01-31 my_dag
Run Code Online (Sandbox Code Playgroud)
我如何提供这个论点?