mjn*_*n12 5 python sockets gevent
我目前正在努力为现有的 django 项目添加对 gevent-socketio 的支持。我发现 gevent.monkey.patch_all() 调用破坏了负责从套接字接收数据的线程的取消机制,我们现在将调用SocketReadThread类。
SocketReadThread非常简单,它在阻塞套接字上调用recv()。当它接收到数据时,它会处理它并再次调用recv()。当发生异常时或线程停止时如发生时的recv()返回的0字节socket.shutdown(SHUT_RDWR)被称为在SocketReadThread.stop_reading()
当 gevent.monkey.patch_all() 替换默认套接字实现时会出现问题。我得到以下异常,而不是很好地关闭:
错误:[Errno 9] 文件描述符已在另一个 greenlet 中关闭
我假设这是因为 gevent 使我的套接字非阻塞以发挥其魔力。这意味着当我调用socket.shutdown(socket.SHUT_RDWR) 时,正在为猴子修补socket.recv调用工作的greenlet试图从关闭的文件描述符中读取。
我编写了一个示例来隔离这个问题:
from gevent import monkey
monkey.patch_all()
import socket
import sys
import threading
import time
class SocketReadThread(threading.Thread):
def __init__(self, socket):
super(SocketReadThread, self).__init__()
self._socket = socket
def run(self):
connected = True
while connected:
try:
print "calling socket.recv"
data = self._socket.recv(1024)
if (len(data) < 1):
print "received nothing, assuming socket shutdown"
connected = False
else :
print "Recieved something: {}".format(data)
except socket.timeout as e:
print "Socket timeout: {}".format(e)
connected = false
except :
ex = sys.exc_info()[1]
print "Unexpected exception occurrred: {}".format(str(ex))
raise ex
def stop_reading(self):
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
if __name__ == '__main__':
sock = socket.socket()
sock.connect(('127.0.0.1', 4242))
st = SocketReadThread(sock)
st.start()
time.sleep(3)
st.stop_reading()
st.join()
Run Code Online (Sandbox Code Playgroud)
如果你打开一个终端并运行nc -lp 4242 &(给这个程序一些连接)然后运行这个程序你会看到上面提到的异常。如果您删除对monkey.patch_all() 的调用,您将看到它运行良好。
我的问题是:如何支持SocketReadThread 的取消,这种方式可以在有或没有 gevent 猴子补丁的情况下使用,并且不需要使用会使取消速度变慢的任意超时(即调用recv()并超时并检查有条件)?
我发现有两种不同的解决方法。第一个是简单地捕获并抑制异常。这似乎工作得很好,因为一个线程关闭套接字以导致另一个线程从阻塞读取中退出是常见的做法。我不知道也不明白为什么 greenlets 除了调试辅助之外还会抱怨这一点。这真的只是一个烦恼。
第二种选择是使用自管道技巧(快速搜索会产生许多解释)作为唤醒阻塞线程的机制。本质上,我们创建第二个文件描述符(套接字就像操作系统的一种文件描述符)用于信号取消。然后,我们使用 select 作为阻塞来等待套接字上的传入数据或取消文件描述符上的取消请求。请参阅下面的示例代码。
from gevent import monkey
monkey.patch_all()
import os
import select
import socket
import sys
import threading
import time
class SocketReadThread(threading.Thread):
def __init__(self, socket):
super(SocketReadThread, self).__init__()
self._socket = socket
self._socket.setblocking(0)
r, w = os.pipe()
self._cancelpipe_r = os.fdopen(r, 'r')
self._cancelpipe_w = os.fdopen(w, 'w')
def run(self):
connected = True
read_fds = [self._socket, self._cancelpipe_r]
while connected:
print "Calling select"
read_list, write_list, x_list = select.select(read_fds, [], [])
print "Select returned"
if self._cancelpipe_r in read_list :
print "exiting"
self._cleanup()
connected = False
elif self._socket in read_list:
print "calling socket.recv"
data = self._socket.recv(1024)
if (len(data) < 1):
print "received nothing, assuming socket shutdown"
connected = False
self._cleanup()
else :
print "Recieved something: {}".format(data)
def stop_reading(self):
print "writing to pipe"
self._cancelpipe_w.write("\n")
self._cancelpipe_w.flush()
print "joining"
self.join()
print "joined"
def _cleanup(self):
self._cancelpipe_r.close()
self._cancelpipe_w.close()
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
if __name__ == '__main__':
sock = socket.socket()
sock.connect(('127.0.0.1', 4242))
st = SocketReadThread(sock)
st.start()
time.sleep(3)
st.stop_reading()
Run Code Online (Sandbox Code Playgroud)
同样,在运行上述程序之前,运行netcat -lp 4242 &为其提供一个要连接的侦听套接字。