Asyncio使用StreamReader解码utf-8

Eth*_*rey 10 python encoding asynchronous utf-8 python-asyncio

我习惯了asyncio并且发现任务处理相当不错,但是将异步库与传统的io库混合起来可能很困难.我目前面临的问题是如何正确解码异步StreamReader.

最简单的解决方案是read()块字节字符串,然后解码每个块 - 请参阅下面的代码.(在我的程序中,我不打印每个块,但将其解码为字符串并将其发送到另一个方法进行处理):

import asyncio
import aiohttp

async def get_data(port):
    url = 'http://localhost:{}/'.format(port)
    r = await aiohttp.get(url)
    stream = r.content
    while not stream.at_eof():
        data = await stream.read(4)
        print(data.decode('utf-8'))
Run Code Online (Sandbox Code Playgroud)

这样可以正常工作,直到有一个utf-8字符在太多块之间分开.例如,如果响应是b'M\xc3\xa4dchen mit Bi\xc3\x9f\n',则读取3的块将起作用,但是4的块将不起作用(因为\xc3并且\x9f处于不同的块中并且解码以结尾的块\xc3将引发以下错误:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data

我查看了这个问题的正确解决方案,至少在阻塞世界中,似乎是io.TextIOWrapper或codecs.StreamReaderWriter(其差异在PEP 0400中讨论).但是,这两者都依赖于典型的阻塞流.

我花了30分钟搜索asyncio的例子并继续寻找我的decode()解决方案.有没有人知道更好的解决方案,或者这是python的asyncio中缺少的功能?

作为参考,以下是使用具有异步流的两个"标准"解码器的结果.

使用编解码器流阅读器:

r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)
Run Code Online (Sandbox Code Playgroud)

例外:

File "echo_client.py", line 13, in get_data
  data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
  data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator
Run Code Online (Sandbox Code Playgroud)

(它调用read()方法直接,而不是yield fromawait它)

我还尝试用io.TextIOWrapper包装流:

stream = TextIOWrapper(r.content)
Run Code Online (Sandbox Code Playgroud)

但这导致以下情况:

File "echo_client.py", line 10, in get_data
  stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'
Run Code Online (Sandbox Code Playgroud)

PS如果你想要一个样本测试用例,请看这个要点.您可以使用python3.5运行它来重现错误.如果将块大小从4更改为3(或30),它将正常工作.

编辑

接受的答案修正了这个魅力.谢谢!如果其他人有这个问题,这里是一个简单的包装类,我在StreamReader上处理解码:

import codecs

class DecodingStreamReader:
    def __init__(self, stream, encoding='utf-8', errors='strict'):
        self.stream = stream
        self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)

    async def read(self, n=-1):
        data = await self.stream.read(n)
        if isinstance(data, (bytes, bytearray)):
            data = self.decoder.decode(data)
        return data

    def at_eof(self):
        return self.stream.at_eof() 
Run Code Online (Sandbox Code Playgroud)

Vin*_*ent 5

您可以使用IncrementalDecoder:

Utf8Decoder = codecs.getincrementaldecoder('utf-8')
Run Code Online (Sandbox Code Playgroud)

用你的例子:

decoder = Utf8Decoder(error='strict')
while not stream.at_eof():
    data = await stream.read(4)
    print(decoder.decode(data), end='')
Run Code Online (Sandbox Code Playgroud)

输出:

Mädchen mit Biß
Run Code Online (Sandbox Code Playgroud)