我正在尝试解析HTTP请求行(例如' GET / HTTP / 1.1 \ r \ n '),这很容易通过使用socket.makefile()。readline()函数(BaseHTTPRequestHandler使用它)来实现,例如:
print sock.makefile().readline()
Run Code Online (Sandbox Code Playgroud)
不幸的是,如文档所述,使用makefile()时,套接字必须处于阻塞模式(不能有超时);我该如何实现类似readline()的函数,而无需使用makefile()文件对象接口并且读取的内容不超过所需数量(因为它将丢弃我以后需要的数据),该功能是否相同?
一个非常低效的例子:
request_line = ""
while not request_line.endswith('\n'):
request_line += sock.recv(1)
print request_line
Run Code Online (Sandbox Code Playgroud)
四年半后,我会为此建议使用asyncio 的 Streams,但您可以通过以下方式正确使用BytesIO
请注意,BytesIO每次检测到一行时,此实现都会“缩小”内存中的对象。如果你不关心这一点,这可能会少很多行。
import socket
import time
from io import BytesIO
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(False)
def handle_line(line):
# or, print("Line Received:", line.decode().rstrip())
print(f"Line Received: {line.decode().rstrip()!r}")
with BytesIO() as buffer:
while True:
try:
resp = sock.recv(100) # Read in some number of bytes -- balance this
except BlockingIOError:
print("sleeping") # Do whatever you want here, this just
time.sleep(2) # illustrates that it's nonblocking
else:
buffer.write(resp) # Write to the BytesIO object
buffer.seek(0) # Set the file pointer to the SoF
start_index = 0 # Count the number of characters processed
for line in buffer:
start_index += len(line)
handle_line(line) # Do something with your line
""" If we received any newline-terminated lines, this will be nonzero.
In that case, we read the remaining bytes into memory, truncate
the BytesIO object, reset the file pointer and re-write the
remaining bytes back into it. This will advance the file pointer
appropriately. If start_index is zero, the buffer doesn't contain
any newline-terminated lines, so we set the file pointer to the
end of the file to not overwrite bytes.
"""
if start_index:
buffer.seek(start_index)
remaining = buffer.read()
buffer.truncate(0)
buffer.seek(0)
buffer.write(remaining)
else:
buffer.seek(0, 2)
Run Code Online (Sandbox Code Playgroud)
(原始答案太糟糕了,不值得保留(我保证),但应该在编辑历史记录中可用)。
这是一个不使用asyncio. 它可以用作socket基于“同步”的替代品asyncio.StreamReader。
import socket
from asyncio import IncompleteReadError # only import the exception class
class SocketStreamReader:
def __init__(self, sock: socket.socket):
self._sock = sock
self._recv_buffer = bytearray()
def read(self, num_bytes: int = -1) -> bytes:
raise NotImplementedError
def readexactly(self, num_bytes: int) -> bytes:
buf = bytearray(num_bytes)
pos = 0
while pos < num_bytes:
n = self._recv_into(memoryview(buf)[pos:])
if n == 0:
raise IncompleteReadError(bytes(buf[:pos]), num_bytes)
pos += n
return bytes(buf)
def readline(self) -> bytes:
return self.readuntil(b"\n")
def readuntil(self, separator: bytes = b"\n") -> bytes:
if len(separator) != 1:
raise ValueError("Only separators of length 1 are supported.")
chunk = bytearray(4096)
start = 0
buf = bytearray(len(self._recv_buffer))
bytes_read = self._recv_into(memoryview(buf))
assert bytes_read == len(buf)
while True:
idx = buf.find(separator, start)
if idx != -1:
break
start = len(self._recv_buffer)
bytes_read = self._recv_into(memoryview(chunk))
buf += memoryview(chunk)[:bytes_read]
result = bytes(buf[: idx + 1])
self._recv_buffer = b"".join(
(memoryview(buf)[idx + 1 :], self._recv_buffer)
)
return result
def _recv_into(self, view: memoryview) -> int:
bytes_read = min(len(view), len(self._recv_buffer))
view[:bytes_read] = self._recv_buffer[:bytes_read]
self._recv_buffer = self._recv_buffer[bytes_read:]
if bytes_read == len(view):
return bytes_read
bytes_read += self._sock.recv_into(view[bytes_read:])
return bytes_read
Run Code Online (Sandbox Code Playgroud)
用法:
reader = SocketStreamReader(sock)
line = reader.readline()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
13777 次 |
| 最近记录: |