无法关闭侦听套接字以从单独的线程中止accept()/select()

Pir*_*ish 1 python sockets select multithreading

我正在编写一个 Python (v3.7.3) 套接字服务器,我想为其使用阻塞 I/O。我使用select()没有超时来接受新客户以及阅读他们的信息。我可以关闭侦听套接字以中止 select(),并捕获 OSError 作为停止执行的指示。

\n\n

但是,当在单独的线程中运行时,这似乎不起作用,我不明白为什么。

\n\n

我知道还有其他方法可以实现此目的,例如使用超时、使用 select() 的虚拟套接字或与侦听器建立虚拟连接以唤醒它。但这些都在某种程度上违背了使用 select() 的目的,并且在单线程中运行时没有必要。

\n\n

这是重现该问题的基本示例,在我的实际代码中,仅代表多个线程中的一个(因此,我首先使用线程):

\n\n
#!/usr/bin/env python3\n\nimport signal\nimport socket\nimport threading\n\n\nclass SocketCloseTest:\n    """Simple test case for using socket.close() to abort select.select()"""\n\n    def __init__(self, port, address=None):\n        self.port = port\n        self.address = address or \'\'\n\n        self.socket = None\n\n    def stop(self):\n        """Close listening socket to stop select.select()"""\n\n        if self.socket:\n            print("Closing listener", self.socket)\n            self.socket.close()\n            print("Listener closed", self.socket)\n\n    def threaded_run(self):\n        """Run test in a separate thread"""\n\n        thread = threading.Thread(target=self.run)\n        print("Starting sub-thread")\n        thread.start()\n        thread.join()\n        print("Sub-thread ended")\n\n    def run(self):\n        """Run test"""\n\n        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\n        # Reuse port for quick re-launch of the application\n        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\n        self.socket.bind((self.address, self.port))\n        print("Starting listener")\n        self.socket.listen()\n\n        try:\n            print("select() started")\n            r, w, e = select.select([self.socket], [], [])\n        except OSError:\n            print("select() aborted")\n        else:\n            print("select() completed")\n\n\nif __name__ == \'__main__\':\n    tester = SocketCloseTest(5000, address=\'\')\n\n    # Set up signal handler for Ctrl-C\n    def signal_handler(signum, frame):\n        print("Received signal {}".format(signum))\n        tester.stop()\n    signal.signal(signal.SIGINT, signal_handler)\n\n    # This works\n    tester.run()\n\n    # This doesn\'t work\n    # tester.threaded_run()\n\n    print("Main thread ended")\n
Run Code Online (Sandbox Code Playgroud)\n\n

使用时test.run(),它按预期运行并导致以下结果:

\n\n
\n

正在启动侦听器
\n 选择已启动
\n ^C接收信号2
\n 正在关闭侦听器 <socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'0.0.0.0\' , 5000)>
\n 侦听器已关闭 <socket.socket [close] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
\n 选择已取消
\n 主线程结束

\n
\n\n

然而,当使用 运行时tester.threaded_run(),它只是挂在对 select() 的调用应该中止的地方。奇怪的是,此时将作业置于后台会导致代码按其应有的方式继续:

\n\n
\n

启动子线程
\n启动侦听器
\n选择启动
\n^C接收信号2
\n关闭侦听器<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\ '0.0.0.0\', 5000)>
\n 侦听器已关闭 <socket.socket [close] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
\n --Ctrl-Z 按下在这里暂停 shell 中的工作--

\n\n

$ bg
\n --Shell 在后台报告作业 --
\n 选择取消
\n 子线程结束
\n 主线程结束

\n
\n\n

谢谢\xe2\x80\xa6

\n\n
    \n
  • 编辑提到accept()患有完全相同的症状。
  • \n
\n

pil*_*row 6

close()在多线程情况下不会做你想做的事。请改用您描述的其他机制之一。

在单线程情况下,控制返回到select(),它重新启动并注意到现在已解除的文件描述符上的 EBADF。(当然,这是非常危险的,因为 fd #3 可能随时被任何其他线程甚至复杂的信号处理程序回收,尽管您的玩具程序看起来很安全。)在多线程情况下,只是不这样close()做t 唤醒你的select()ing 线程。

Python文档警告

注意: close()释放与连接关联的资源,但不一定立即关闭连接。如果您想及时关闭连接,请shutdown()在 之前调用close()

事实上,这是一个比较棘手且依赖平台的问题。摘录于德高望重的 Dobb 博士 2008 年的一篇文章

在某些操作系统上,[ shutdown()而不是close() ] 也是唯一可行的解​​决方案:在 FreeBSD 上,不带shutdown () 的 close( )不会唤醒在read()select()中等待的进程中等待的进程。 ...

另一个需要考虑的问题是,通过shutdown()close()关闭可能不会被视为操作系统中的读取事件......

但是,shutdown()仅适用于已建立连接的套接字,不适用于侦听新连接的套接字或其他类型的文件描述符......

(无论如何,在我的系统上, ashutdown() 确实会唤醒select()ing 线程。)