Airflow 中用于 HTTPS 的 HttpOperator 或 HttpHook

Dat*_*and 7 https google-cloud-platform airflow

我正在对 Google Cloud 上的 Airflow 进行一些概念验证。

本质上,我想创建一个工作流,从 REST API (https) 下载数据,将此数据转换为 JSON 格式并将其上传到 Google Cloud 存储单元。

我已经用纯 Python 代码完成了这项工作,并且可以正常工作。很直接!但是因为我想安排这个并且有一些依赖关系,所以 Airflow 应该是这个的理想工具。

在仔细阅读 Airflow 文档后,我看到 HttpOperator 和/或 HttpHook 可以为下载部分提供技巧。

我已经使用我的电子邮件/密码将我的 Http 连接创建到 WebUI 中以进行授权,如下所示:

{Conn Id: "atlassian_marketplace", Conn Type: "HTTP", Host: " https://marketplace.atlassian.com/rest/2 ", Schema: None/Blank, Login: "my username", Password: "my密码”,端口:无/空白,额外:无/空白}

第一个问题: - 何时使用 SimpleHttpOperator 与 HttpHook?

第二个问题: - 我们如何使用 SimpleHttpOperator 或 HttpHook 进行 HTTPs 调用?

第三个问题:-我们如何访问API调用返回的数据?

就我而言,XCOM 功能不会起作用,因为这些 API 调用会返回大量数据(100-300mb)!

我已经在谷歌上找到了一个关于如何在我的用例中使用操作符/钩子的示例代码,但我还没有找到任何有用的东西。

有任何想法吗?

到目前为止,我把我的代码框架放在这里。

# Usual Airflow import

# Dag creation
dag = DAG(
    'get_reporting_links',
    default_args=default_args,
    description='Get reporting links',
    schedule_interval=timedelta(days=1))

# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)

# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
            http_conn_id="atlassian_marketplace",
          endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}), 
            method="GET")

# Task 3: Save JSON data locally
# TODO: transform_json: transform to JSON get_data.json()?

# Task 4: Upload data to GCP
# TODO: upload_gcs: use Airflow GCS connection

# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)

# Dependencies
start >> get_data >> transform_json >> upload_gcs >> stop
Run Code Online (Sandbox Code Playgroud)

kax*_*xil 4

看下面的例子:

# Usual Airflow import

# Dag creation
dag = DAG(
    'get_reporting_links',
    default_args=default_args,
    description='Get reporting links',
    schedule_interval=timedelta(days=1))

# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)

# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
     task_id="get_data",
     http_conn_id="atlassian_marketplace",
     endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}), 
     method="GET",
     xcom_push=True,
)

def transform_json(**kwargs):
    ti = kwargs['ti']
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='get_data')
    ...
    # transform the json here and save the content to a file


# Task 3: Save JSON data locally
save_and_transform = PythonOperator(
    task_id="save_and_transform", 
    python_callable=transform_json,
    provide_context=True,
)

# Task 4: Upload data to GCP
upload_to_gcs = FileToGoogleCloudStorageOperator(...)

# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)

# Dependencies
start >> get_data >> save_and_transform >> upload_to_gcs >> stop
Run Code Online (Sandbox Code Playgroud)