iMa*_*ath 2 python pyqt pyqt5 python-asyncio aiohttp
我打算在 PyQt 应用程序中嵌入 aiohttp 服务器,但是当我运行下面的代码时,Qt 窗口无法显示,我知道这是由 引起的web.run_app(app)
,我尝试将其移动到线程中,但是然后我明白了RuntimeError: There is no current event loop in thread \'Dummy-1\'
,那我该怎么办?我发现asyncqt可能会有所帮助,但我不知道如何使用它来处理 aiohttp 服务器。
from PyQt5.QtCore import *\nfrom PyQt5.QtGui import *\nfrom PyQt5.QtWidgets import *\nfrom aiohttp import web\n\n\nclass ThreadGo(QThread): # threading.Thread\n # implementing new slots in a QThread subclass is error-prone and discouraged.\n\n def __init__(self, parent, func, *args, **kwargs):\n super().__init__(parent)\n self.func = func\n self.args = args\n self.kwargs = kwargs\n self.result = 0\n\n onFinished = self.kwargs.get(\'onFinished\')\n self.finished.connect(onFinished) if onFinished else None # \xe7\x94\xa8lambda\xe8\xbf\x98\xe4\xb8\x8d\xe8\xa1\x8c\xe5\x91\xa2\n\n self.finished.connect(self.deleteLater)\n self.start()\n\n def run(self):\n\n self.result = self.func(*self.args) # deleteLater\n\n\nclass Window(QMainWindow):\n\n def __init__(self, parent=None, **kwargs):\n super().__init__(parent, **kwargs)\n self.setUpHTTPServer()\n\n def setUpHTTPServer(self):\n async def hello(request):\n return web.Response(text="Hello, world")\n app = web.Application()\n app.add_routes([web.get(\'/\', hello)])\n web.run_app(app)\n # ThreadGo(self, lambda:web.run_app(app))#get RuntimeError: There is no current event loop in thread \'Dummy-1\n\n\nif __name__ == "__main__":\n from sys import argv, exit\n\n a = QApplication(argv)\n w = Window()\n w.show()\n exit(a.exec_())\n
Run Code Online (Sandbox Code Playgroud)\n
在下面的示例中,我展示了如何将 Qt 与 aiohttp 服务器一起使用:
import asyncio
from functools import cached_property
from PyQt5.QtWidgets import QApplication, QMainWindow
from asyncqt import QEventLoop
from aiohttp import web
class Window(QMainWindow):
def __init__(self, parent=None, **kwargs):
super().__init__(parent, **kwargs)
self.setup_server()
def setup_server(self):
self.app.add_routes([web.get("/", self.hello)])
@cached_property
def app(self):
return web.Application()
async def hello(self, request):
return web.Response(text="Hello, world")
def run(self):
web.run_app(self.app)
def main():
import sys
a = QApplication(sys.argv)
loop = QEventLoop(a)
asyncio.set_event_loop(loop)
w = Window()
w.show()
w.run()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
446 次 |
最近记录: |