用于 HTTPS 的气流 SimpleHttpOperator

Kri*_*ris 5 https airflow

我正在尝试使用 SimpleHttpOperator 来使用 RESTful API。但是,顾名思义,它只支持我需要使用 HTTPS URI 的 HTTP 协议。所以,现在,我必须使用 Python 中的“请求”对象或处理应用程序代码中的调用。但是,这可能不是标准方式。因此,我正在寻找可用于从 Airflow 中使用 HTTPS URI 的任何其他选项。谢谢。

mel*_*r55 6

我深入研究了这一点,并且很确定这种行为是气流中的一个错误。我在这里创建了一张票:https : //issues.apache.org/jira/browse/AIRFLOW-2910

目前,您能做的最好的事情是覆盖 SimpleHttpOperator 和 HttpHook,以改变 HttpHook.get_conn 的工作方式(接受 https)。我可能最终会这样做,如果我这样做,我会发布一些代码。

更新:

操作员覆盖:

from airflow.operators.http_operator import SimpleHttpOperator
from airflow.exceptions import AirflowException
from operators.https_support.https_hook import HttpsHook


class HttpsOperator(SimpleHttpOperator):
    def execute(self, context):
        http = HttpsHook(self.method, http_conn_id=self.http_conn_id)

        self.log.info("Calling HTTP method")

        response = http.run(self.endpoint,
                            self.data,
                            self.headers,
                            self.extra_options)
        if self.response_check:
            if not self.response_check(response):
                raise AirflowException("Response check returned False.")
        if self.xcom_push_flag:
            return response.text
Run Code Online (Sandbox Code Playgroud)

钩子覆盖

from airflow.hooks.http_hook import HttpHook
import requests


class HttpsHook(HttpHook):
    def get_conn(self, headers):
        """
        Returns http session for use with requests. Supports https.
        """
        conn = self.get_connection(self.http_conn_id)
        session = requests.Session()

        if "://" in conn.host:
            self.base_url = conn.host
        elif conn.schema:
            self.base_url = conn.schema + "://" + conn.host
        elif conn.conn_type:  # https support
            self.base_url = conn.conn_type + "://" + conn.host
        else:
            # schema defaults to HTTP
            self.base_url = "http://" + conn.host

        if conn.port:
            self.base_url = self.base_url + ":" + str(conn.port) + "/"
        if conn.login:
            session.auth = (conn.login, conn.password)
        if headers:
            session.headers.update(headers)

        return session
Run Code Online (Sandbox Code Playgroud)

用法:

SimpleHttpOperator 的替代品。


小智 5

这已经有几个月的历史了,但就其价值而言,我在 Airflow 1.10.2 上进行 HTTPS 调用没有任何问题。

在我最初的测试中,我从 sendgrid 请求模板,因此连接设置如下:

Conn Id   : sendgrid_templates_test
Conn Type : HTTP   
Host      :   https://api.sendgrid.com/
Extra     : { "authorization": "Bearer [my token]"}
Run Code Online (Sandbox Code Playgroud)

然后在 dag 代码中:

get_templates = SimpleHttpOperator(
        task_id='get_templates',
        method='GET',
        endpoint='/v3/templates',
        http_conn_id = 'sendgrid_templates_test',
        trigger_rule="all_done",
        xcom_push=True
        dag=dag,
    )
Run Code Online (Sandbox Code Playgroud)

这奏效了。另请注意,我的请求发生在 Branch Operator 之后,因此我需要适当地设置触发规则(设置为“all_done”以确保即使跳过其中一个分支也能触发),这与问题无关,但是我只是想指出来。

现在要清楚,我确实收到了不安全请求警告,因为我没有启用证书验证。但是你可以在下面看到结果日志

[2019-02-21 16:15:01,333] {http_operator.py:89} INFO - Calling HTTP method
[2019-02-21 16:15:01,336] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,335] {base_hook.py:83} INFO - Using connection to: id: sendgrid_templates_test. Host:  https://api.sendgrid.com/, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: {'authorization': 'Bearer [my token]'}
[2019-02-21 16:15:01,338] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,337] {http_hook.py:126} INFO - Sending 'GET' to url:  https://api.sendgrid.com//v3/templates
[2019-02-21 16:15:01,956] {logging_mixin.py:95} WARNING - /home/csconnell/.pyenv/versions/airflow/lib/python3.6/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
[2019-02-21 16:15:05,242] {logging_mixin.py:95} INFO - [2019-02-21 16:15:05,241] {jobs.py:2527} INFO - Task exited with return code 0
Run Code Online (Sandbox Code Playgroud)