如何将iterable转换为流?

Dav*_*ver 12 python iterator stream

如果我有一个包含字符串的iterable,是否有一种简单的方法可以将其转换为流?我想做这样的事情:

def make_file():
    yield "hello\n"
    yield "world\n"

output = tarfile.TarFile(…)
stream = iterable_to_stream(make_file())
output.addfile(…, stream)
Run Code Online (Sandbox Code Playgroud)

Mec*_*ail 28

Python 3有一个新的I/O流API(库文档),取代了旧的类文件对象协议.(新的API在io模块中也可以在Python 2中使用,并且它与文件类对象协议向后兼容.)

这是 Python 2和3中新API的实现:

import io

def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
        def readable(self):
            return True
        def readinto(self, b):
            try:
                l = len(b)  # We're supposed to return at most this much
                chunk = self.leftover or next(iterable)
                output, self.leftover = chunk[:l], chunk[l:]
                b[:len(output)] = output
                return len(output)
            except StopIteration:
                return 0    # indicate EOF
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)
Run Code Online (Sandbox Code Playgroud)

用法示例:

with iterable_to_stream(str(x**2).encode('utf8') for x in range(11)) as s:
    print(s.read())
Run Code Online (Sandbox Code Playgroud)

  • 到了 2020 年,Python 3.8 仍然是最好的方法吗?尝试了一下,仍然有效,但也许可以简化? (6认同)

Dav*_*ver 12

由于它看起来不像是采用"标准"方式,因此我将一个简单的实现组合在一起:

class iter_to_stream(object):
    def __init__(self, iterable):
        self.buffered = ""
        self.iter = iter(iterable)

    def read(self, size):
        result = ""
        while size > 0:
            data = self.buffered or next(self.iter, None)
            self.buffered = ""
            if data is None:
                break
            size -= len(data)
            if size < 0:
                data, self.buffered = data[:size], data[size:]
            result += data
        return result
Run Code Online (Sandbox Code Playgroud)


sha*_*zow 12

这是我的流迭代器urllib3的实验分支,通过iterables支持流分块请求:

class IterStreamer(object):
    """
    File-like streaming iterator.
    """
    def __init__(self, generator):
        self.generator = generator
        self.iterator = iter(generator)
        self.leftover = ''

    def __len__(self):
        return self.generator.__len__()

    def __iter__(self):
        return self.iterator

    def next(self):
        return self.iterator.next()

    def read(self, size):
        data = self.leftover
        count = len(self.leftover)

        if count < size:
            try:
                while count < size:
                    chunk = self.next()
                    data += chunk
                    count += len(chunk)
            except StopIteration:
                pass

        self.leftover = data[size:]

        return data[:size]
Run Code Online (Sandbox Code Playgroud)

上下文来源:https: //github.com/shazow/urllib3/blob/filepost-stream/urllib3/filepost.py#L23

相关单元测试:https: //github.com/shazow/urllib3/blob/filepost-stream/test/test_filepost.py#L9

唉,这段代码还没有进入稳定的分支,但是由于没有大量的分块请求得不到支持,但它应该是你想要做的事情的良好基础.有关如何使用它的示例,请参阅源链接.