我使用 Google Composer 一段时间了 ( composer-0.5.2-airflow-1.9.0),并且在使用 Airflow 调度程序时遇到了一些问题。调度程序容器有时会崩溃,并且可能会陷入锁定状态,无法启动任何新任务(数据库连接错误),因此我必须重新创建整个 Composer 环境。这次,CrashLoopBackOff调度程序 Pod 无法再重新启动。该错误与我之前遇到的非常相似。这是 Stackdriver 的回溯:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 826, in scheduler
job.run()
File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 198, in run
self._execute()
File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 1549, in _execute
self._execute_helper(processor_manager)
File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 1594, in _execute_helper
self.reset_state_for_orphaned_tasks(session=session)
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 266, in reset_state_for_orphaned_tasks
.filter(or_(*filter_for_tis), TI.state.in_(resettable_states))
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2783, in …Run Code Online (Sandbox Code Playgroud) 我设置了两个 DAG,我们称第一个为 Orchestrator,第二个为 Worker。Orchestrator 的工作是从 API 检索列表,并针对此列表中的每个元素,使用一些参数触发工作 DAG。
我将两个工作流程分开的原因是我希望能够仅重播失败的“工作人员”工作流程(如果一个失败,我不想重播所有工作人员实例)。
我能够使事情正常进行,但现在我发现监控有多么困难,因为我的 task_id 对所有人来说都是相同的,所以我决定根据“协调器”工作流程从 API 检索的值来使用动态 task_id。
但是,我无法从运算符外部的 dag_run 对象检索值。基本上,我希望这个能够工作:
with models.DAG('specific_workflow', schedule_interval=None, default_args=default_dag_args) as dag:
name = context['dag_run'].name
hello_world = BashOperator(task_id='hello_{}'.format(name), bash_command="echo Hello {{ dag_run.conf.name }}", dag=dag)
bye = BashOperator(task_id='bye_{}'.format(name), bash_command="echo Goodbye {{ dag_run.conf.name }}", dag=dag)
hello_world >> bye
Run Code Online (Sandbox Code Playgroud)
但我无法定义这个“上下文”对象。但是,我可以从操作符(例如 PythonOperator 和 BashOperator)访问它。
是否可以在操作符之外检索 dag_run 对象?
我需要创建一个自定义气流操作符,我应该能够在云编写器中运行的气流模板(写在python中)中使用...
如果我创建自定义气流运算符,我如何在云上运行的模板中使用它(如何使其可供所有人使用,以便他们可以使用该运算符)
python apache google-cloud-platform airflow google-cloud-composer
运行几天后,Google Cloud Composer Web UI 无限期地返回 502 服务器错误:
Error: Server Error
The server encountered a temporary error and could not complete your request.
Please try again in 30 seconds.
Run Code Online (Sandbox Code Playgroud)
修复它的唯一方法是重新创建 Composer 环境。尽管运行几天后,新环境因相同的错误而崩溃。
Image version: composer-1.4.0-airflow-1.10.0
Python version: 3
Run Code Online (Sandbox Code Playgroud)
有谁知道根本原因是什么?
Cloud Composer / Airflow 中是否有一个设置可以默认禁用 DAG 文件夹中的新 DAG,而无需在 DAG 文件中指定?
我希望能够将这些 DAG 加载到开发环境中,用户只需手动运行这些 DAG,而不是对其进行调度。
我看了这里,https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg 但我找不到任何明显的东西。
Composer 由于无法读取日志文件而导致任务失败,它抱怨编码不正确。
这是 UI 中显示的日志:
*** Unable to read remote log from gs://bucket/logs/campaign_exceptions_0_0_1/merge_campaign_exceptions/2019-08-03T10:00:00+00:00/1.log
*** 'ascii' codec can't decode byte 0xc2 in position 6986: ordinal not in range(128)
*** Log file does not exist: /home/airflow/gcs/logs/campaign_exceptions_0_0_1/merge_campaign_exceptions/2019-08-03T10:00:00+00:00/1.log
*** Fetching from: http://airflow-worker-68dc66c9db-x945n:8793/log/campaign_exceptions_0_0_1/merge_campaign_exceptions/2019-08-03T10:00:00+00:00/1.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='airflow-worker-68dc66c9db-x945n', port=8793): Max retries exceeded with url: /log/campaign_exceptions_0_0_1/merge_campaign_exceptions/2019-08-03T10:00:00+00:00/1.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f1c9ff19d10>: Failed to establish a new connection: [Errno -2] Name or service not known',))
Run Code Online (Sandbox Code Playgroud)
我尝试在谷歌云控制台中查看该文件,它也会引发错误:
Failed to load
Tracking Number: 8075820889980640204 …Run Code Online (Sandbox Code Playgroud) 我希望在使用时创建一个边车容器KubernetesPodOperator。我看到了创建选项init container,pod_mutation_hook但没有看到创建sidecar. 如果我创建一个init container必须在启动实际容器之前完成的任务,但我不希望这样,并且只要端口中的主容器处于活动状态,就需要边车运行。
我们使用 GKE(Google Kubernetes Engine)在 GCC(Google Cloude Composer)中运行 Airflow 作为我们的数据管道。
我们一开始有 6 个节点,后来意识到成本飙升,而且我们没有使用那么多的 CPU。所以我们认为我们可以降低最大值,但也可以启用自动缩放。
由于我们在夜间运行管道,并且白天只运行较小的作业,因此我们希望在 1-3 个节点之间运行自动缩放。
因此,我们在 GKE 节点池上启用了自动缩放,但没有按照他们的建议在 GCE 实例组上启用自动缩放。然而,我们得到这个:

为什么是这样?
我们从未超过 20% 的使用率,那为什么不缩小规模呢?
今天早上我们手动将其缩小到 3 个节点。
google-compute-engine google-cloud-platform google-kubernetes-engine google-cloud-composer
我有一个 docker 映像,需要在启动时安装 JSON 凭证文件。容器通过如下命令启动:
docker run -v [CREDENTIALS_FILE]:/credentials.json image_name
该映像位于 Google 容器注册表中,我想使用 KubernetesPodOperator 在 Cloud Composer dag 中启动它。
有没有办法通过 KubernetesPodOperator 挂载单个文件?理想情况下,该文件将托管在云存储位置。我读到有一个volume/volume_mount选项,但传递单个文件似乎是一件繁重的事情——希望还有另一个我忽略的选项。
KubernetesPodOperator(namespace='default',
image="gcr.io/image_name,
name="start-container-image",
task_id="start-container-image",
volume=[?],
volume_mounts=[?],
dag=dag)
Run Code Online (Sandbox Code Playgroud) 我使用了参数 files =["abc.txt"]。我从气流文档中获取了信息... https://airflow.readthedocs.io/en/stable/_modules/airflow/operators/email_operator.html
但我收到找不到该文件的错误。我的问题是这个气流将从哪里选择我的文件。是来自 Composer 环境中的 GCS Bucket 还是 DAG 文件夹?
我需要在哪里上传文件以及“文件”参数的正确语法是什么?
提前致谢。