我正在尝试使用 SimpleHttpOperator 来使用 RESTful API。但是,顾名思义,它只支持我需要使用 HTTPS URI 的 HTTP 协议。所以,现在,我必须使用 Python 中的“请求”对象或处理应用程序代码中的调用。但是,这可能不是标准方式。因此,我正在寻找可用于从 Airflow 中使用 HTTPS URI 的任何其他选项。谢谢。
我深入研究了这一点,并且很确定这种行为是气流中的一个错误。我在这里创建了一张票:https : //issues.apache.org/jira/browse/AIRFLOW-2910
目前,您能做的最好的事情是覆盖 SimpleHttpOperator 和 HttpHook,以改变 HttpHook.get_conn 的工作方式(接受 https)。我可能最终会这样做,如果我这样做,我会发布一些代码。
更新:
操作员覆盖:
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.exceptions import AirflowException
from operators.https_support.https_hook import HttpsHook
class HttpsOperator(SimpleHttpOperator):
def execute(self, context):
http = HttpsHook(self.method, http_conn_id=self.http_conn_id)
self.log.info("Calling HTTP method")
response = http.run(self.endpoint,
self.data,
self.headers,
self.extra_options)
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
if self.xcom_push_flag:
return response.text
Run Code Online (Sandbox Code Playgroud)
钩子覆盖
from airflow.hooks.http_hook import HttpHook
import requests
class HttpsHook(HttpHook):
def get_conn(self, headers):
"""
Returns http session for use with requests. Supports https.
"""
conn = self.get_connection(self.http_conn_id)
session = requests.Session()
if "://" in conn.host:
self.base_url = conn.host
elif conn.schema:
self.base_url = conn.schema + "://" + conn.host
elif conn.conn_type: # https support
self.base_url = conn.conn_type + "://" + conn.host
else:
# schema defaults to HTTP
self.base_url = "http://" + conn.host
if conn.port:
self.base_url = self.base_url + ":" + str(conn.port) + "/"
if conn.login:
session.auth = (conn.login, conn.password)
if headers:
session.headers.update(headers)
return session
Run Code Online (Sandbox Code Playgroud)
用法:
SimpleHttpOperator 的替代品。
小智 5
这已经有几个月的历史了,但就其价值而言,我在 Airflow 1.10.2 上进行 HTTPS 调用没有任何问题。
在我最初的测试中,我从 sendgrid 请求模板,因此连接设置如下:
Conn Id : sendgrid_templates_test
Conn Type : HTTP
Host : https://api.sendgrid.com/
Extra : { "authorization": "Bearer [my token]"}
Run Code Online (Sandbox Code Playgroud)
然后在 dag 代码中:
get_templates = SimpleHttpOperator(
task_id='get_templates',
method='GET',
endpoint='/v3/templates',
http_conn_id = 'sendgrid_templates_test',
trigger_rule="all_done",
xcom_push=True
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
这奏效了。另请注意,我的请求发生在 Branch Operator 之后,因此我需要适当地设置触发规则(设置为“all_done”以确保即使跳过其中一个分支也能触发),这与问题无关,但是我只是想指出来。
现在要清楚,我确实收到了不安全请求警告,因为我没有启用证书验证。但是你可以在下面看到结果日志
[2019-02-21 16:15:01,333] {http_operator.py:89} INFO - Calling HTTP method
[2019-02-21 16:15:01,336] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,335] {base_hook.py:83} INFO - Using connection to: id: sendgrid_templates_test. Host: https://api.sendgrid.com/, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: {'authorization': 'Bearer [my token]'}
[2019-02-21 16:15:01,338] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,337] {http_hook.py:126} INFO - Sending 'GET' to url: https://api.sendgrid.com//v3/templates
[2019-02-21 16:15:01,956] {logging_mixin.py:95} WARNING - /home/csconnell/.pyenv/versions/airflow/lib/python3.6/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
InsecureRequestWarning)
[2019-02-21 16:15:05,242] {logging_mixin.py:95} INFO - [2019-02-21 16:15:05,241] {jobs.py:2527} INFO - Task exited with return code 0
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12202 次 |
| 最近记录: |