我知道如何为 Compute Engine 分配静态外部 IP 地址,但这可以通过 Google Cloud Composer (Airflow) 完成吗?我想大多数公司都需要该功能,因为他们通常会写回可能位于防火墙后面的仓库,但我找不到任何有关如何执行此操作的文档。
python google-compute-engine google-cloud-platform airflow google-cloud-composer
有没有办法将现有的 Airflow 实例迁移到 Google Cloud Composer?
我们目前正在使用数据库的 postgres 运行我们自己的 Airflow 实例。理想情况下,我们能够保留 DAG 的现有历史记录,我认为这需要复制整个数据库。这可能吗?
Airflow封装的 DAG似乎是实现合理的生产气流部署的重要组成部分。
我有一个带有动态 subDAG 的 DAG,由配置文件驱动,例如:
配置文件:
imports:
- project_foo
- project_bar`
Run Code Online (Sandbox Code Playgroud)
这会产生 subdag 任务,例如imports.project_{foo|bar}.step{1|2|3}.
我通常使用 python 的open函数读取配置文件,一个 laconfig = open(os.path.join(os.path.split(__file__)[0], 'config.yaml')
不幸的是,当使用打包的 DAG 时,这会导致错误:
Broken DAG: [/home/airflow/dags/workflows.zip] [Errno 20] Not a directory: '/home/airflow/dags/workflows.zip/config.yaml'
Run Code Online (Sandbox Code Playgroud)
有什么想法/最佳实践可以在这里推荐吗?
我目前需要为 Cloud Composer 中的 Airflow 实例提供 Airflow db 连接凭据。
我在 Airflow 连接 UI 上看到的只是airflow_db mysql airflow-sqlproxy-service.
我想通过 DataGrip 连接到它。
另一件事是,如果我想更改[core] sql_alchemy_conn覆盖环境变量,当我将它添加到 Cloud Composer 环境中的 env 变量时,我该怎么做,因为它受到限制。
我有一个在 Cloud Composer 上运行的 Airflow 环境(3 个n1-standard-1节点;图像版本:composer-1.4.0-airflow-1.10.0;配置覆盖:核心catchup_by_default=False;PyPI 包:)kubernetes==8.0.1。
在 DAG 运行期间,一些任务(所有 GKEPodOperator)由于气流工作器 pod 驱逐而失败。所有这些任务都设置为retries=0. 其中之一被重新排队并重试。当任务设置为 0 重试时,为什么会发生这种情况?为什么它只会发生在其中一项任务上?
我在 Google 的 Cloud Composer 上运行 Airflow。我现在用的是KubernetesPodOperator并想通过一个谷歌存储桶安装到目录荚gcsfuse。似乎要做到这一点,我需要提供此处指定的 k8s 特权安全上下文。似乎气流最近向 KubernetesPodOperator 添加了 security_context 参数。我在操作符中指定的安全上下文是:
security_context = {
'securityContext': {
'privileged': True,
'capabilities':
{'add': ['SYS_ADMIN']}
}
}
Run Code Online (Sandbox Code Playgroud)
当我尝试airflow test dag_id task_id date在气流工作器中运行时,pod 启动,当代码尝试通过 gcsfuse 挂载存储桶时,它会引发错误"fusermount: fuse device not found, try 'modprobe fuse' first"。这使得它看起来好像 security_context 不起作用(例如)。
我是否误解了运算符中的 security_context 参数和/或我的 securityContext 字典定义是否无效?
kubernetes google-kubernetes-engine airflow google-cloud-composer
通常在容器环境中,我可以轻松地安装我的私有依赖项,requirements.txt如下所示:
--index-url https://user:pass@some_repo.jfrog.io/some_repo/api/pypi/pypi/simple
some-private-lib
Run Code Online (Sandbox Code Playgroud)
该软件包"some-private-lib"是我想安装的那个。
在 GCP Composer 环境中,我尝试使用 GCloud 命令 (
gcloud composer environments update ENV_NAME --update-pypi-packages-from-file ./requirements.txt --location LOCATION),但它抱怨requirements.txt不遵循 PEP-508 中定义的格式。然后我找到了这个关于如何从私有仓库安装依赖项的官方指南,但它不是很清楚。按照指南中的说明,我创建了一个pip.conf包含以下内容的文件:
[global]
extra-index-url=https://user:pass@some_repo.jfrog.io/some_repo/api/pypi/pypi/simple
Run Code Online (Sandbox Code Playgroud)
然后把它放到我的环境中的GCS斗:gs://us-central1-xxxx-bucket/config/pip/pip.conf。
现在我gcloud composer environments update ENV_NAME --update-pypi-packages-from-file ./requirements.txt --location LOCATION再次运行命令 ( ),requirements.txt只包含一行:some-private-lib。它因一个非常不透明的错误而失败:failed: Failed to install PyPI packages.
我做错了什么?还有其他可用的解决方法吗?谢谢!
我目前正在使用 AirflowPostgresToGoogleCloudStorageOperator并将GoogleCloudStorageToBigQueryOperator我的 Postgres 数据库(托管在 AWS RDS 上)的每个表导出到 BigQuery。它有效,但我有 75 个表,所以 Airflow 创建了 75 * 2 个工作。由于我是 Airflow 的新手,我不知道这是否是一个好习惯。
无论如何,我想找到一种方法将所有表一次(pg_dump?)导出到 GCS,然后将它们导入 BigQuery。
postgresql google-cloud-storage google-bigquery airflow google-cloud-composer
在使用 DataFlowPythonOperator 之前,我使用了气流的 BashOperator。它工作正常。我的光束管道需要一个特定的参数,这是我在 BashOperator 中使用的命令。
仅供参考 - 此光束管道用于将 CSV 文件转换为镶木地板。
python /home/airflow/gcs/pyFile.py --runner DataflowRunner --project my-project --jobname my-job--num-workers 3 --temp_location gs://path/Temp/ --staging_location gs://path/Staging/ --input gs://path/*.txt --odir gs://path/output --ofile current
Run Code Online (Sandbox Code Playgroud)
这些是我必须传递的必需参数,以使我的光束管道正常工作。
现在如何在DataFlowPythonOperator 中传递这些参数?
我试过了,但我不知道我应该在哪里提到所有参数。我试过这样的事情:
task1 = DataFlowPythonOperator(
task_id = 'my_task',
py_file = '/home/airflow/gcs/pyfile.py',
gcp_conn_id='google_cloud_default',
options={
"num-workers" : 3,
"input" : 'gs://path/*.txt',
"odir" : 'gs://path/',
"ofile" : 'current',
"jobname" : 'my-job'
},
dataflow_default_options={
"project": 'my-project',
"staging_location": 'gs://path/Staging/',
"temp_location": 'gs://path/Temp/',
},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
使用当前的脚本(虽然我不确定它的格式是否正确),这是我在日志中得到的:
[2020-03-06 05:08:48,070] {base_task_runner.py:115} INFO - Job …Run Code Online (Sandbox Code Playgroud) 我正在运行composer-1.16.6-airflow-1.10.15。
对于每日计划的 DAG,我想编写一个自定义on_failure_notification,仅在任务实例连续多天失败时发送通知。我的计划是获取 dag 运行的失败任务实例并检查每个最后成功执行日期:
def my_on_failure_notification(context):
failed_tis = context["dag_run"].get_task_instances(state=State.FAILED)
tis_to_notify_about = [ti.task_id for ti in failed_tis if ti.previous_execution_date_success < days_ago(2)]
Run Code Online (Sandbox Code Playgroud)
此操作失败并出现以下跟踪:
[...]
File "/home/airflow/gcs/dags/xxx.py", line 94, in my_on_failure_notification
ti.task_id for ti in failed_tis if ti.previous_execution_date_success < days_ago(2)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 625, in previous_execution_date_success
prev_ti = self._get_previous_ti(state=State.SUCCESS)
File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 582, in _get_previous_ti
dag = self.task.dag
AttributeError: 'TaskInstance' object has no attribute 'task'
Run Code Online (Sandbox Code Playgroud)
我认为发生这种情况是因为 TI 是作为 SQLAlchemy 模型检索的,该模型不包含该task …