在压缩的,分块的HTTP流到达时有效地读取行

Tho*_* B. 8 python http-streaming python-requests

我编写了一个HTTP-Server,它生成由JSON结构事件组成的无限HTTP流.与Twitter的流媒体API类似.这些事件由\n(根据服务器发送的事件和Content-Type:text/event-stream)分隔,并且长度可以不同.

回应是

  • chunked(HTTP 1.1 Transfer-Encoding:chunked)由于源源不断
  • 压缩(Content-Encoding:gzip)以节省带宽.

我想尽快在Python中使用这些行,并尽可能节省资源,而不需要重新发明轮子.

由于我目前正在使用python-requests,你知道如何让它工作吗?如果您认为,python-requests在这里无法提供帮助,我对其他框架/库完全开放.

我当前的实现基于请求iter_lines(...)接收线路的用途.但chunk_size参数很棘手.如果设置1为非常强大,因为某些事件可能是几千字节.如果设置为大于1的任何值,则某些事件会一直停留到下一个到达并且整个缓冲区"已填满".事件之间的时间可能是几秒钟.我期望它chunk_size是某种"接收的最大字节数",就像在unix中一样recv(...).相应的手册页说:

接收呼叫通常会返回任何可用的数据,直到请求的数量,而不是等待收到所请求的全部金额.

但这显然不是它在请求库中的工作方式.他们或多或少地使用它作为"接收的确切字节数".在查看源代码时,我无法确定哪个部分对此负责.也许httplib的Response或ssl的SSLSocket.

作为一种解决方法,我尝试将服务器上的线路填充到块大小的倍数.但是,请求库中的块大小用于从压缩响应流中获取字节.所以这不会起作用,直到我可以填充我的行,使他们的压缩字节序列是块大小的倍数.但这似乎太过于苛刻了.

我已经读过Twisted可以用于客户端上的http流的非阻塞,非缓冲处理,但我只找到了在服务器上创建流响应的代码.

Tho*_* B. 8

感谢Martijn Pieters的回答,我停止了解决python请求行为,并寻找一种完全不同的方法.

我最终使用了pyCurl.您可以使用类似于select + recv循环而不反转控制流并放弃对Tornado等专用IO循环的控制.这样很容易使用一个生成器,一旦它们到达就产生新的生产线 - 在中间层没有进一步缓冲,可能会引入延迟或运行IO循环的其他线程.

同时,它足够高级,您不需要打扰分块传输编码,SSL加密或gzip压缩.

这是我的旧代码,其中chunk_size= 1导致45%的CPU负载,chunk_size> 1导致额外的延迟.

import requests
class RequestsHTTPStream(object):
    def __init__(self, url):
        self.url = url

    def iter_lines(self):
        headers = {'Cache-Control':'no-cache',
                   'Accept': 'text/event-stream',
                   'Accept-Encoding': 'gzip'}
        response = requests.get(self.url, stream=True, headers=headers)
        return response.iter_lines(chunk_size=1)
Run Code Online (Sandbox Code Playgroud)

这是我基于pyCurl的新代码:(不幸的是curl_easy_*样式perform完全阻塞,这使得在不使用线程的情况下难以产生线条.因此我使用curl_multi_*方法)

import pycurl
import urllib2
import httplib
import StringIO

class CurlHTTPStream(object):
    def __init__(self, url):
        self.url = url
        self.received_buffer = StringIO.StringIO()

        self.curl = pycurl.Curl()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
        self.curl.setopt(pycurl.ENCODING, 'gzip')
        self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
        self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)

        self.curlmulti = pycurl.CurlMulti()
        self.curlmulti.add_handle(self.curl)

        self.status_code = 0

    SELECT_TIMEOUT = 10

    def _any_data_received(self):
        return self.received_buffer.tell() != 0

    def _get_received_data(self):
        result = self.received_buffer.getvalue()
        self.received_buffer.truncate(0)
        self.received_buffer.seek(0)
        return result

    def _check_status_code(self):
        if self.status_code == 0:
            self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
        if self.status_code != 0 and self.status_code != httplib.OK:
            raise urllib2.HTTPError(self.url, self.status_code, None, None, None)

    def _perform_on_curl(self):
        while True:
            ret, num_handles = self.curlmulti.perform()
            if ret != pycurl.E_CALL_MULTI_PERFORM:
                break
        return num_handles

    def _iter_chunks(self):
        while True:
            remaining = self._perform_on_curl()
            if self._any_data_received():
                self._check_status_code()
                yield self._get_received_data()
            if remaining == 0:
                break
            self.curlmulti.select(self.SELECT_TIMEOUT)

        self._check_status_code()
        self._check_curl_errors()

    def _check_curl_errors(self):
        for f in self.curlmulti.info_read()[2]:
            raise pycurl.error(*f[1:])

    def iter_lines(self):
        chunks = self._iter_chunks()
        return self._split_lines_from_chunks(chunks)

    @staticmethod
    def _split_lines_from_chunks(chunks):
        #same behaviour as requests' Response.iter_lines(...)

        pending = None
        for chunk in chunks:

            if pending is not None:
                chunk = pending + chunk
            lines = chunk.splitlines()

            if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
                pending = lines.pop()
            else:
                pending = None

            for line in lines:
                yield line

        if pending is not None:
            yield pending
Run Code Online (Sandbox Code Playgroud)

此代码尝试从传入流中获取尽可能多的字节,如果只有少数则不必要地阻塞.相比之下,CPU负载约为0.2%


Mar*_*ers 6

requests你的iter_lines()电话阻塞并不是"错" .

Response.iter_lines()方法调用Response.iter_content(),它调用urllib3HTTPResponse.stream(),它调用HTTPResponse.read().

这些调用传递一个块大小,这是传递给套接字的大小self._fp.read(amt).这是一个有问题的调用,就像self._fp生成的文件对象一样socket.makefile()(由httplib模块完成); 并且此.read()调用阻塞,直到amt读取(压缩)字节为止.

这个低级套接字文件对象确实支持.readline()更高效的调用,但urllib3在处理压缩数据时无法使用此调用; 行终止符不会在压缩流中可见.

不幸的是,当响应没有被压缩时urllib3也不会调用self._fp.readline(); 调用结构的方式很难传递你想要在线缓冲模式而不是在块缓冲模式下读取.

我必须说HTTP不是用于流事件的最佳协议; 我会为此使用不同的协议.Websockets是我们想到的,或者是针对您的特定用例的自定义协议.