如何将 Cloud Run 容器执行到 Airflow DAG 中?

wal*_*r91 4 google-cloud-platform airflow google-cloud-composer google-cloud-run

我正在尝试使用 Cloud Run 来运行容器作为 Airflow 的 DAG 的任务。

似乎没有像 CloudRunOperator 或类似的东西,我在文档中找不到任何内容(Cloud Run 和 Airflow 都可以)。

有人处理过这个问题吗?如果是,我如何使用 Cloud Run 运行容器并处理 xcom?

提前致谢!!

Ric*_*o D 10

据我所知,当容器部署到 Cloud Run 时,它会自动侦听可能发送的请求。请参阅文档以供参考。

相反,您可以发送请求来访问已部署的容器。您可以使用下面的代码来完成此操作。

该 DAG 具有三个任务print_tokentask_get_op并且process_data

  • print_token打印对已部署的 Cloud Run 容器的请求进行身份验证所需的身份令牌。我使用“xcom_pull”获取“BashOperator”的输出并分配身份验证令牌,token以便这可以用于对您将执行的 HTTP 请求进行身份验证。
  • task_get_op对连接执行 GET cloud_run(仅包含 Cloud Run 端点)并定义'Authorization': 'Bearer ' + token身份验证标头。
  • process_data对“task_get_op”执行“xcom_pull”以获取输出并使用 PythonOperator 打印它。
import datetime

import airflow
from airflow.operators import bash
from airflow.operators import python
from airflow.providers.http.operators.http import SimpleHttpOperator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_http_request',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    print_token = bash.BashOperator(
        task_id='print_token', 
        bash_command='gcloud auth print-identity-token "--audiences=https://hello-world-fri824-ab.c.run.app"' # The end point of the deployed Cloud Run container
    ) 

    token = "{{ task_instance.xcom_pull(task_ids='print_token') }}" # gets output from 'print_token' task

    task_get_op = SimpleHttpOperator(
        task_id='get_op',
        method='GET',
        http_conn_id='cloud_run',
        headers={'Authorization': 'Bearer ' + token },
    )

    def process_data_from_http(**kwargs):
      ti = kwargs['ti']
      http_data = ti.xcom_pull(task_ids='get_op')
      print(http_data)

    process_data = python.PythonOperator(
      task_id='process_data_from_http',
      python_callable=process_data_from_http,
      provide_context=True
      )
    print_token >> task_get_op >> process_data

Run Code Online (Sandbox Code Playgroud)

cloud_run联系:

在此输入图像描述

输出(图):

在此输入图像描述

print_token 日志:

在此输入图像描述

task_get_op 日志:

在此输入图像描述

process_data 日志(从 GET 输出): 在此输入图像描述

注意:我使用 Cloud Composer 1.17.7 和 Airflow 2.0.2 并安装apache-airflow-providers-http以能够使用SimpleHttpOperator