使用Python"请求"库读取流式http响应

lar*_*sks 10 python stream python-requests kubernetes

我正在尝试 使用模块使用Kubernetes api 提供的事件流 requests.我遇到了一个看起来像缓冲问题:requests模块似乎滞后于一个事件.

我的代码看起来像这样:

r = requests.get('http://localhost:8080/api/v1beta1/watch/services',
                 stream=True)

for line in r.iter_lines():
    print 'LINE:', line
Run Code Online (Sandbox Code Playgroud)

当Kubernetes发出事件通知时,此代码将仅显示新事件发生时发出的最后一个事件,这使得对于需要响应服务添加/删除事件的代码几乎完全没用.

我通过curl在子进程中生成而不是使用requests库来解决这个问题:

p = subprocess.Popen(['curl', '-sfN',
                      'http://localhost:8080/api/watch/services'],
                     stdout=subprocess.PIPE,
                     bufsize=1)

for line in iter(p.stdout.readline, b''):
    print 'LINE:', line
Run Code Online (Sandbox Code Playgroud)

这有效,但牺牲了一些灵活性.有没有办法避免requests库的缓冲问题?

lar*_*sks 6

此行为是由于库中iter_lines 方法的错误实现requests.

iter_lineschunk_size使用iter_content迭代器迭代数据块中的响应内容.如果chunk_size可用于从远程服务器读取的数据少于 字节(通常在读取最后一行输出时就是这种情况),则读取操作将阻塞,直到chunk_size数据字节可用.

我编写了自己iter_lines正常运行的例程:

import os


def iter_lines(fd, chunk_size=1024):
    '''Iterates over the content of a file-like object line-by-line.'''

    pending = None

    while True:
        chunk = os.read(fd.fileno(), chunk_size)
        if not chunk:
            break

        if pending is not None:
            chunk = pending + chunk
            pending = None

        lines = chunk.splitlines()

        if lines and lines[-1]:
            pending = lines.pop()

        for line in lines:
            yield line

    if pending:
        yield(pending)
Run Code Online (Sandbox Code Playgroud)

这是有效的,因为os.read将返回少于chunk_size字节的数据而不是等待缓冲区填充.