标签: google-cloud-composer

Airflow Scheduler 不断崩溃,数据库连接错误(Google Composer)

我使用 Google Composer 一段时间了 ( composer-0.5.2-airflow-1.9.0),并且在使用 Airflow 调度程序时遇到了一些问题。调度程序容器有时会崩溃,并且可能会陷入锁定状态,无法启动任何新任务(数据库连接错误),因此我必须重新创建整个 Composer 环境。这次,CrashLoopBackOff调度程序 Pod 无法再重新启动。该错误与我之前遇到的非常相似。这是 Stackdriver 的回溯:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 826, in scheduler
    job.run()
  File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 198, in run
    self._execute()
  File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 1549, in _execute
    self._execute_helper(processor_manager)
  File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 1594, in _execute_helper
    self.reset_state_for_orphaned_tasks(session=session)
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 266, in reset_state_for_orphaned_tasks
    .filter(or_(*filter_for_tis), TI.state.in_(resettable_states))
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2783, in …
Run Code Online (Sandbox Code Playgroud)

airflow airflow-scheduler google-cloud-composer

5
推荐指数
1
解决办法
4024
查看次数

Apache Airflow - 如何在使用 TriggerDagRunOperator 触发的流中检索操作员外部的 dag_run 数据

我设置了两个 DAG,我们称第一个为 Orchestrator,第二个为 Worker。Orchestrator 的工作是从 API 检索列表,并针对此列表中的每个元素,使用一些参数触发工作 DAG。

我将两个工作流程分开的原因是我希望能够仅重播失败的“工作人员”工作流程(如果一个失败,我不想重播所有工作人员实例)。

我能够使事情正常进行,但现在我发现监控有多么困难,因为我的 task_id 对所有人来说都是相同的,所以我决定根据“协调器”工作流程从 API 检索的值来使用动态 task_id。

但是,我无法从运算符外部的 dag_run 对象检索值。基本上,我希望这个能够工作:

with models.DAG('specific_workflow', schedule_interval=None, default_args=default_dag_args) as dag:
    name = context['dag_run'].name
    hello_world = BashOperator(task_id='hello_{}'.format(name), bash_command="echo Hello {{ dag_run.conf.name }}", dag=dag)
    bye = BashOperator(task_id='bye_{}'.format(name), bash_command="echo Goodbye {{ dag_run.conf.name }}", dag=dag)

    hello_world >> bye
Run Code Online (Sandbox Code Playgroud)

但我无法定义这个“上下文”对象。但是,我可以从操作符(例如 PythonOperator 和 BashOperator)访问它。

是否可以在操作符之外检索 dag_run 对象?

airflow google-cloud-composer

5
推荐指数
1
解决办法
7403
查看次数

如何在气流中创建自定义运算符并将其用于通过云编写器运行的气流模板(在Google云平台中)

我需要创建一个自定义气流操作符,我应该能够在云编写器中运行的气流模板(写在python中)中使用...

如果我创建自定义气流运算符,我如何在云上运行的模板中使用它(如何使其可供所有人使用,以便他们可以使用该运算符)

python apache google-cloud-platform airflow google-cloud-composer

5
推荐指数
1
解决办法
5250
查看次数

Google Cloud Composer 服务器遇到临时错误,无法完成您的请求

运行几天后,Google Cloud Composer Web UI 无限期地返回 502 服务器错误:

Error: Server Error
The server encountered a temporary error and could not complete your request.
Please try again in 30 seconds.
Run Code Online (Sandbox Code Playgroud)

修复它的唯一方法是重新创建 Composer 环境。尽管运行几天后,新环境因相同的错误而崩溃。

Image version: composer-1.4.0-airflow-1.10.0
Python version: 3
Run Code Online (Sandbox Code Playgroud)

有谁知道根本原因是什么?

airflow google-cloud-composer

5
推荐指数
1
解决办法
2655
查看次数

默认禁用新 DAG

Cloud Composer / Airflow 中是否有一个设置可以默认禁用 DAG 文件夹中的新 DAG,而无需在 DAG 文件中指定?

我希望能够将这些 DAG 加载到开发环境中,用户只需手动运行这些 DAG,而不是对其进行调度。

我看了这里,https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg 但我找不到任何明显的东西。

airflow google-cloud-composer

5
推荐指数
1
解决办法
2753
查看次数

由于无法读取日志文件,任务失败

Composer 由于无法读取日志文件而导致任务失败,它抱怨编码不正确。

这是 UI 中显示的日志:

*** Unable to read remote log from gs://bucket/logs/campaign_exceptions_0_0_1/merge_campaign_exceptions/2019-08-03T10:00:00+00:00/1.log
*** 'ascii' codec can't decode byte 0xc2 in position 6986: ordinal not in range(128)

*** Log file does not exist: /home/airflow/gcs/logs/campaign_exceptions_0_0_1/merge_campaign_exceptions/2019-08-03T10:00:00+00:00/1.log
*** Fetching from: http://airflow-worker-68dc66c9db-x945n:8793/log/campaign_exceptions_0_0_1/merge_campaign_exceptions/2019-08-03T10:00:00+00:00/1.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='airflow-worker-68dc66c9db-x945n', port=8793): Max retries exceeded with url: /log/campaign_exceptions_0_0_1/merge_campaign_exceptions/2019-08-03T10:00:00+00:00/1.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f1c9ff19d10>: Failed to establish a new connection: [Errno -2] Name or service not known',))
Run Code Online (Sandbox Code Playgroud)

我尝试在谷歌云控制台中查看该文件,它也会引发错误:

Failed to load

Tracking Number: 8075820889980640204 …
Run Code Online (Sandbox Code Playgroud)

airflow google-cloud-composer

5
推荐指数
1
解决办法
3020
查看次数

使用 KubernetesPodOperator 创建 sidecar

我希望在使用时创建一个边车容器KubernetesPodOperator。我看到了创建选项init containerpod_mutation_hook但没有看到创建sidecar. 如果我创建一个init container必须在启动实际容器之前完成的任务,但我不希望这样,并且只要端口中的主容器处于活动状态,就需要边车运行。

airflow google-cloud-composer

5
推荐指数
1
解决办法
1468
查看次数

GKE 自动扩缩不会缩小规模

我们使用 GKE(Google Kubernetes Engine)在 GCC(Google Cloude Composer)中运行 Airflow 作为我们的数据管道。

我们一开始有 6 个节点,后来意识到成本飙升,而且我们没有使用那么多的 CPU。所以我们认为我们可以降低最大值,但也可以启用自动缩放。

由于我们在夜间运行管道,并且白天只运行较小的作业,因此我们希望在 1-3 个节点之间运行自动缩放。

因此,我们在 GKE 节点池上启用了自动缩放,但没有按照他们的建议在 GCE 实例组上启用自动缩放。然而,我们得到这个: 节点池无法扩展

为什么是这样?

下面是过去 4 天的 CPU 利用率图表: 在此输入图像描述

我们从未超过 20% 的使用率,那为什么不缩小规模呢?

今天早上我们手动将其缩小到 3 个节点。

google-compute-engine google-cloud-platform google-kubernetes-engine google-cloud-composer

5
推荐指数
1
解决办法
4260
查看次数

如何将单个文件作为卷挂载到 KubernetesPodOperator?

我有一个 docker 映像,需要在启动时安装 JSON 凭证文件。容器通过如下命令启动:

docker run -v [CREDENTIALS_FILE]:/credentials.json image_name

该映像位于 Google 容器注册表中,我想使用 KubernetesPodOperator 在 Cloud Composer dag 中启动它。

有没有办法通过 KubernetesPodOperator 挂载单个文件?理想情况下,该文件将托管在云存储位置。我读到有一个volume/volume_mount选项,但传递单个文件似乎是一件繁重的事情——希望还有另一个我忽略的选项。

KubernetesPodOperator(namespace='default',
                      image="gcr.io/image_name,
                      name="start-container-image",
                      task_id="start-container-image",
                      volume=[?],
                      volume_mounts=[?],
                      dag=dag)
Run Code Online (Sandbox Code Playgroud)

google-kubernetes-engine airflow google-cloud-composer

5
推荐指数
1
解决办法
2698
查看次数

如何在 Airflow 中使用电子邮件操作符附加文件

我使用了参数 files =["abc.txt"]。我从气流文档中获取了信息... https://airflow.readthedocs.io/en/stable/_modules/airflow/operators/email_operator.html

但我收到找不到该文件的错误。我的问题是这个气流将从哪里选择我的文件。是来自 Composer 环境中的 GCS Bucket 还是 DAG 文件夹?

我需要在哪里上传文件以及“文件”参数的正确语法是什么?

提前致谢。

google-cloud-platform airflow google-cloud-composer

5
推荐指数
1
解决办法
2万
查看次数