Dat*_*and 7 https google-cloud-platform airflow
我正在对 Google Cloud 上的 Airflow 进行一些概念验证。
本质上,我想创建一个工作流,从 REST API (https) 下载数据,将此数据转换为 JSON 格式并将其上传到 Google Cloud 存储单元。
我已经用纯 Python 代码完成了这项工作,并且可以正常工作。很直接!但是因为我想安排这个并且有一些依赖关系,所以 Airflow 应该是这个的理想工具。
在仔细阅读 Airflow 文档后,我看到 HttpOperator 和/或 HttpHook 可以为下载部分提供技巧。
我已经使用我的电子邮件/密码将我的 Http 连接创建到 WebUI 中以进行授权,如下所示:
{Conn Id: "atlassian_marketplace", Conn Type: "HTTP", Host: " https://marketplace.atlassian.com/rest/2 ", Schema: None/Blank, Login: "my username", Password: "my密码”,端口:无/空白,额外:无/空白}
第一个问题: - 何时使用 SimpleHttpOperator 与 HttpHook?
第二个问题: - 我们如何使用 SimpleHttpOperator 或 HttpHook 进行 HTTPs 调用?
第三个问题:-我们如何访问API调用返回的数据?
就我而言,XCOM 功能不会起作用,因为这些 API 调用会返回大量数据(100-300mb)!
我已经在谷歌上找到了一个关于如何在我的用例中使用操作符/钩子的示例代码,但我还没有找到任何有用的东西。
有任何想法吗?
到目前为止,我把我的代码框架放在这里。
# Usual Airflow import
# Dag creation
dag = DAG(
'get_reporting_links',
default_args=default_args,
description='Get reporting links',
schedule_interval=timedelta(days=1))
# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)
# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
http_conn_id="atlassian_marketplace",
endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}),
method="GET")
# Task 3: Save JSON data locally
# TODO: transform_json: transform to JSON get_data.json()?
# Task 4: Upload data to GCP
# TODO: upload_gcs: use Airflow GCS connection
# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)
# Dependencies
start >> get_data >> transform_json >> upload_gcs >> stop
Run Code Online (Sandbox Code Playgroud)
看下面的例子:
# Usual Airflow import
# Dag creation
dag = DAG(
'get_reporting_links',
default_args=default_args,
description='Get reporting links',
schedule_interval=timedelta(days=1))
# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)
# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
task_id="get_data",
http_conn_id="atlassian_marketplace",
endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}),
method="GET",
xcom_push=True,
)
def transform_json(**kwargs):
ti = kwargs['ti']
pulled_value_1 = ti.xcom_pull(key=None, task_ids='get_data')
...
# transform the json here and save the content to a file
# Task 3: Save JSON data locally
save_and_transform = PythonOperator(
task_id="save_and_transform",
python_callable=transform_json,
provide_context=True,
)
# Task 4: Upload data to GCP
upload_to_gcs = FileToGoogleCloudStorageOperator(...)
# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)
# Dependencies
start >> get_data >> save_and_transform >> upload_to_gcs >> stop
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4030 次 |
| 最近记录: |