标签: google-cloud-composer

您可以获得 Google Cloud Composer / Airflow 的静态外部 IP 地址吗?

我知道如何为 Compute Engine 分配静态外部 IP 地址,但这可以通过 Google Cloud Composer (Airflow) 完成吗?我想大多数公司都需要该功能,因为他们通常会写回可能位于防火墙后面的仓库,但我找不到任何有关如何执行此操作的文档。

python google-compute-engine google-cloud-platform airflow google-cloud-composer

5
推荐指数
2
解决办法
2659
查看次数

将现有的 Airflow DB 迁移到 Cloud Composer

有没有办法将现有的 Airflow 实例迁移到 Google Cloud Composer?

我们目前正在使用数据库的 postgres 运行我们自己的 Airflow 实例。理想情况下,我们能够保留 DAG 的现有历史记录,我认为这需要复制整个数据库。这可能吗?

google-cloud-platform airflow google-cloud-composer

5
推荐指数
1
解决办法
1178
查看次数

如何从气流打包的 DAG 中读取配置文件?

Airflow封装的 DAG似乎是实现合理的生产气流部署的重要组成部分。

我有一个带有动态 subDAG 的 DAG,由配置文件驱动,例如:

配置文件:

imports:
  - project_foo
  - project_bar`
Run Code Online (Sandbox Code Playgroud)

这会产生 subdag 任务,例如imports.project_{foo|bar}.step{1|2|3}.

我通常使用 python 的open函数读取配置文件,一个 laconfig = open(os.path.join(os.path.split(__file__)[0], 'config.yaml')

不幸的是,当使用打包的 DAG 时,这会导致错误:

Broken DAG: [/home/airflow/dags/workflows.zip] [Errno 20] Not a directory: '/home/airflow/dags/workflows.zip/config.yaml'
Run Code Online (Sandbox Code Playgroud)

有什么想法/最佳实践可以在这里推荐吗?

airflow google-cloud-composer

5
推荐指数
1
解决办法
2261
查看次数

如何从 Google Cloud Composer 获取 Airflow db 凭据

我目前需要为 Cloud Composer 中的 Airflow 实例提供 Airflow db 连接凭据。
我在 Airflow 连接 UI 上看到的只是airflow_db mysql airflow-sqlproxy-service.

我想通过 DataGrip 连接到它。
另一件事是,如果我想更改[core] sql_alchemy_conn覆盖环境变量,当我将它添加到 Cloud Composer 环境中的 env 变量时,我该怎么做,因为它受到限制。

google-cloud-platform airflow google-cloud-composer

5
推荐指数
1
解决办法
1440
查看次数

尽管重试= 0,但气流任务在失败后重试

我有一个在 Cloud Composer 上运行的 Airflow 环境(3 个n1-standard-1节点;图像版本:composer-1.4.0-airflow-1.10.0;配置覆盖:核心catchup_by_default=False;PyPI 包:)kubernetes==8.0.1

在 DAG 运行期间,一些任务(所有 GKEPodOperator)由于气流工作器 pod 驱逐而失败。所有这些任务都设置为retries=0. 其中之一被重新排队并重试。当任务设置为 0 重试时,为什么会发生这种情况?为什么它只会发生在其中一项任务上?

airflow airflow-scheduler google-cloud-composer

5
推荐指数
1
解决办法
1557
查看次数

Airflow 中的 KubernetesPodOperator 特权 security_context

我在 Google 的 Cloud Composer 上运行 Airflow。我现在用的是KubernetesPodOperator并想通过一个谷歌存储桶安装到目录荚gcsfuse。似乎要做到这一点,我需要提供此处指定的 k8s 特权安全上下文。似乎气流最近向 KubernetesPodOperator 添加了 security_context 参数。我在操作符中指定的安全上下文是:

security_context = {
  'securityContext': {
    'privileged': True,
    'capabilities':
      {'add': ['SYS_ADMIN']}
  }
}
Run Code Online (Sandbox Code Playgroud)

当我尝试airflow test dag_id task_id date在气流工作器中运行时,pod 启动,当代码尝试通过 gcsfuse 挂载存储桶时,它会引发错误"fusermount: fuse device not found, try 'modprobe fuse' first"。这使得它看起来好像 security_context 不起作用(例如)。

我是否误解了运算符中的 security_context 参数和/或我的 securityContext 字典定义是否无效?

kubernetes google-kubernetes-engine airflow google-cloud-composer

5
推荐指数
1
解决办法
866
查看次数

有没有在 GCP Composer Airflow 上安装私有依赖的成功案例?

背景资料

通常在容器环境中,我可以轻松地安装我的私有依赖项,requirements.txt如下所示:

--index-url https://user:pass@some_repo.jfrog.io/some_repo/api/pypi/pypi/simple

some-private-lib
Run Code Online (Sandbox Code Playgroud)

该软件包"some-private-lib"是我想安装的那个。

问题

在 GCP Composer 环境中,我尝试使用 GCloud 命令 ( gcloud composer environments update ENV_NAME --update-pypi-packages-from-file ./requirements.txt --location LOCATION),但它抱怨requirements.txt不遵循 PEP-508 中定义的格式。然后我找到了这个关于如何从私有仓库安装依赖项的官方指南,但它不是很清楚。按照指南中的说明,我创建了一个pip.conf包含以下内容的文件:

[global]
extra-index-url=https://user:pass@some_repo.jfrog.io/some_repo/api/pypi/pypi/simple
Run Code Online (Sandbox Code Playgroud)

然后把它放到我的环境中的GCS斗:gs://us-central1-xxxx-bucket/config/pip/pip.conf

现在我gcloud composer environments update ENV_NAME --update-pypi-packages-from-file ./requirements.txt --location LOCATION再次运行命令 ( ),requirements.txt只包含一行:some-private-lib。它因一个非常不透明的错误而失败:failed: Failed to install PyPI packages.

我做错了什么?还有其他可用的解决方法吗?谢谢!

pip airflow google-cloud-composer

5
推荐指数
1
解决办法
604
查看次数

Airflow 将 postgres 数据库的所有表导出到 BigQuery

我目前正在使用 AirflowPostgresToGoogleCloudStorageOperator并将GoogleCloudStorageToBigQueryOperator我的 Postgres 数据库(托管在 AWS RDS 上)的每个表导出到 BigQuery。它有效,但我有 75 个表,所以 Airflow 创建了 75 * 2 个工作。由于我是 Airflow 的新手,我不知道这是否是一个好习惯。

无论如何,我想找到一种方法将所有表一次(pg_dump?)导出到 GCS,然后将它们导入 BigQuery。

postgresql google-cloud-storage google-bigquery airflow google-cloud-composer

5
推荐指数
1
解决办法
1796
查看次数

如何将气流 DataFlowPythonOperator 用于梁管道?

在使用 DataFlowPythonOperator 之前,我使用了气流的 BashOperator。它工作正常。我的光束管道需要一个特定的参数,这是我在 BashOperator 中使用的命令。

仅供参考 - 此光束管道用于将 CSV 文件转换为镶木地板。

python /home/airflow/gcs/pyFile.py --runner DataflowRunner --project my-project --jobname my-job--num-workers 3 --temp_location gs://path/Temp/ --staging_location gs://path/Staging/ --input gs://path/*.txt --odir gs://path/output --ofile current
Run Code Online (Sandbox Code Playgroud)

这些是我必须传递的必需参数,以使我的光束管道正常工作。

现在如何在DataFlowPythonOperator 中传递这些参数?

我试过了,但我不知道我应该在哪里提到所有参数。我试过这样的事情:

    task1 = DataFlowPythonOperator(
    task_id = 'my_task',
    py_file = '/home/airflow/gcs/pyfile.py',
    gcp_conn_id='google_cloud_default',
    options={
        "num-workers" : 3,
        "input" : 'gs://path/*.txt',
        "odir" : 'gs://path/',
        "ofile" : 'current',
        "jobname" : 'my-job'
    },
    dataflow_default_options={
        "project": 'my-project',
        "staging_location": 'gs://path/Staging/',
        "temp_location": 'gs://path/Temp/',    
  },
  dag=dag
)
Run Code Online (Sandbox Code Playgroud)

使用当前的脚本(虽然我不确定它的格式是否正确),这是我在日志中得到的:

    [2020-03-06 05:08:48,070] {base_task_runner.py:115} INFO - Job …
Run Code Online (Sandbox Code Playgroud)

google-cloud-platform airflow google-cloud-composer

5
推荐指数
1
解决办法
1006
查看次数

Airflow:通过 DagRun.get_task_instances() 检索任务实例后,“TaskInstance”对象没有属性“task”

我正在运行composer-1.16.6-airflow-1.10.15。

对于每日计划的 DAG,我想编写一个自定义on_failure_notification,仅在任务实例连续多天失败时发送通知。我的计划是获取 dag 运行的失败任务实例并检查每个最后成功执行日期:

def my_on_failure_notification(context):
    failed_tis = context["dag_run"].get_task_instances(state=State.FAILED)
    tis_to_notify_about = [ti.task_id for ti in failed_tis if ti.previous_execution_date_success < days_ago(2)]
Run Code Online (Sandbox Code Playgroud)

此操作失败并出现以下跟踪:

[...]
File "/home/airflow/gcs/dags/xxx.py", line 94, in my_on_failure_notification
ti.task_id for ti in failed_tis if ti.previous_execution_date_success < days_ago(2)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 625, in previous_execution_date_success
prev_ti = self._get_previous_ti(state=State.SUCCESS)
File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 582, in _get_previous_ti
dag = self.task.dag
AttributeError: 'TaskInstance' object has no attribute 'task'
Run Code Online (Sandbox Code Playgroud)

我认为发生这种情况是因为 TI 是作为 SQLAlchemy 模型检索的,该模型不包含该task …

python airflow google-cloud-composer

5
推荐指数
0
解决办法
3476
查看次数