在 PyQt 应用程序中嵌入 aiohttp 服务器

iMa*_*ath 2 python pyqt pyqt5 python-asyncio aiohttp

我打算在 PyQt 应用程序中嵌入 aiohttp 服务器,但是当我运行下面的代码时,Qt 窗口无法显示,我知道这是由 引起的web.run_app(app),我尝试将其移动到线程中,但是然后我明白了RuntimeError: There is no current event loop in thread \'Dummy-1\',那我该怎么办?我发现asyncqt可能会有所帮助,但我不知道如何使用它来处理 aiohttp 服务器。

\n\n
from PyQt5.QtCore import *\nfrom PyQt5.QtGui import *\nfrom PyQt5.QtWidgets import *\nfrom aiohttp import web\n\n\nclass ThreadGo(QThread):  # threading.Thread\n    # implementing new slots in a QThread subclass is error-prone and discouraged.\n\n    def __init__(self, parent, func, *args, **kwargs):\n        super().__init__(parent)\n        self.func = func\n        self.args = args\n        self.kwargs = kwargs\n        self.result = 0\n\n        onFinished = self.kwargs.get(\'onFinished\')\n        self.finished.connect(onFinished) if onFinished else None  # \xe7\x94\xa8lambda\xe8\xbf\x98\xe4\xb8\x8d\xe8\xa1\x8c\xe5\x91\xa2\n\n        self.finished.connect(self.deleteLater)\n        self.start()\n\n    def run(self):\n\n        self.result = self.func(*self.args)  # deleteLater\n\n\nclass Window(QMainWindow):\n\n    def __init__(self, parent=None, **kwargs):\n        super().__init__(parent, **kwargs)\n        self.setUpHTTPServer()\n\n    def setUpHTTPServer(self):\n        async def hello(request):\n            return web.Response(text="Hello, world")\n        app = web.Application()\n        app.add_routes([web.get(\'/\', hello)])\n        web.run_app(app)\n        # ThreadGo(self, lambda:web.run_app(app))#get RuntimeError: There is no current event loop in thread \'Dummy-1\n\n\nif __name__ == "__main__":\n    from sys import argv, exit\n\n    a = QApplication(argv)\n    w = Window()\n    w.show()\n    exit(a.exec_())\n
Run Code Online (Sandbox Code Playgroud)\n

eyl*_*esc 5

在下面的示例中,我展示了如何将 Qt 与 aiohttp 服务器一起使用:

import asyncio
from functools import cached_property

from PyQt5.QtWidgets import QApplication, QMainWindow

from asyncqt import QEventLoop

from aiohttp import web


class Window(QMainWindow):
    def __init__(self, parent=None, **kwargs):
        super().__init__(parent, **kwargs)
        self.setup_server()

    def setup_server(self):
        self.app.add_routes([web.get("/", self.hello)])

    @cached_property
    def app(self):
        return web.Application()

    async def hello(self, request):
        return web.Response(text="Hello, world")

    def run(self):
        web.run_app(self.app)


def main():
    import sys

    a = QApplication(sys.argv)

    loop = QEventLoop(a)
    asyncio.set_event_loop(loop)
    w = Window()
    w.show()

    w.run()


if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)