python socket readline没有socket.makefile()

Dan*_*ork 5 python sockets

我正在尝试解析HTTP请求行(例如' GET / HTTP / 1.1 \ r \ n '),这很容易通过使用socket.makefile()。readline()函数(BaseHTTPRequestHandler使用它)来实现,例如:

print sock.makefile().readline()
Run Code Online (Sandbox Code Playgroud)

不幸的是,如文档所述,使用makefile()时,套接字必须处于阻塞模式(不能有超时);我该如何实现类似readline()的函数,而无需使用makefile()文件对象接口并且读取的内容不超过所需数量(因为它将丢弃我以后需要的数据),该功能是否相同?

一个非常低效的例子:

request_line = ""
while not request_line.endswith('\n'):
    request_line += sock.recv(1)
print request_line 
Run Code Online (Sandbox Code Playgroud)

jed*_*rds 8

四年半后,我会为此建议使用asyncio 的 Streams,但您可以通过以下方式正确使用BytesIO

请注意,BytesIO每次检测到一行时,此实现都会“缩小”内存中的对象。如果你不关心这一点,这可能会少很多行。

import socket
import time
from io import BytesIO

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(False)


def handle_line(line):
    # or, print("Line Received:", line.decode().rstrip())
    print(f"Line Received: {line.decode().rstrip()!r}")


with BytesIO() as buffer:
    while True:
        try:
            resp = sock.recv(100)       # Read in some number of bytes -- balance this
        except BlockingIOError:
            print("sleeping")           # Do whatever you want here, this just
            time.sleep(2)               #   illustrates that it's nonblocking
        else:
            buffer.write(resp)          # Write to the BytesIO object
            buffer.seek(0)              # Set the file pointer to the SoF
            start_index = 0             # Count the number of characters processed
            for line in buffer:
                start_index += len(line)
                handle_line(line)       # Do something with your line

            """ If we received any newline-terminated lines, this will be nonzero.
                In that case, we read the remaining bytes into memory, truncate
                the BytesIO object, reset the file pointer and re-write the
                remaining bytes back into it.  This will advance the file pointer
                appropriately.  If start_index is zero, the buffer doesn't contain
                any newline-terminated lines, so we set the file pointer to the
                end of the file to not overwrite bytes.
            """
            if start_index:
                buffer.seek(start_index)
                remaining = buffer.read()
                buffer.truncate(0)
                buffer.seek(0)
                buffer.write(remaining)
            else:
                buffer.seek(0, 2)
Run Code Online (Sandbox Code Playgroud)

(原始答案太糟糕了,不值得保留(我保证),但应该在编辑历史记录中可用)。

  • 这个答案有几个问题。首先,`StringIO` 不接受缓冲区大小作为参数;上面的构造将产生一个包含文字内容“2048”(即一个四字符的字符串)的缓冲区。这也将丢弃在看到第一个换行符后收到的任何数据。 (11认同)
  • 工作良好且高效,只需进行一点小小的修改(至少对我而言)。我需要插入对行中是否存在换行符的检查,因为“for line in buffer:”语句确实返回了缓冲区的其余部分,即使它不包含换行符。所以 for 语句对我来说看起来像这样,让它工作: `for line in buffer: if line.endswith(b'\n'): start_index += len(line) handle_line(line) ` (2认同)

Mat*_*haq 6

套接字流读取器

这是一个不使用asyncio. 它可以用作socket基于“同步”的替代品asyncio.StreamReader

import socket
from asyncio import IncompleteReadError  # only import the exception class


class SocketStreamReader:
    def __init__(self, sock: socket.socket):
        self._sock = sock
        self._recv_buffer = bytearray()

    def read(self, num_bytes: int = -1) -> bytes:
        raise NotImplementedError

    def readexactly(self, num_bytes: int) -> bytes:
        buf = bytearray(num_bytes)
        pos = 0
        while pos < num_bytes:
            n = self._recv_into(memoryview(buf)[pos:])
            if n == 0:
                raise IncompleteReadError(bytes(buf[:pos]), num_bytes)
            pos += n
        return bytes(buf)

    def readline(self) -> bytes:
        return self.readuntil(b"\n")

    def readuntil(self, separator: bytes = b"\n") -> bytes:
        if len(separator) != 1:
            raise ValueError("Only separators of length 1 are supported.")

        chunk = bytearray(4096)
        start = 0
        buf = bytearray(len(self._recv_buffer))
        bytes_read = self._recv_into(memoryview(buf))
        assert bytes_read == len(buf)

        while True:
            idx = buf.find(separator, start)
            if idx != -1:
                break

            start = len(self._recv_buffer)
            bytes_read = self._recv_into(memoryview(chunk))
            buf += memoryview(chunk)[:bytes_read]

        result = bytes(buf[: idx + 1])
        self._recv_buffer = b"".join(
            (memoryview(buf)[idx + 1 :], self._recv_buffer)
        )
        return result

    def _recv_into(self, view: memoryview) -> int:
        bytes_read = min(len(view), len(self._recv_buffer))
        view[:bytes_read] = self._recv_buffer[:bytes_read]
        self._recv_buffer = self._recv_buffer[bytes_read:]
        if bytes_read == len(view):
            return bytes_read
        bytes_read += self._sock.recv_into(view[bytes_read:])
        return bytes_read
Run Code Online (Sandbox Code Playgroud)

用法:

reader = SocketStreamReader(sock)
line = reader.readline()
Run Code Online (Sandbox Code Playgroud)