具有异步函数的 Python 多处理

Ere*_*evi 4 websocket python-3.x async-await python-asyncio python-multiprocessing

我搭建了一个websocket服务器,它的简化版本如下所示:

import websockets, subprocess, asyncio, json, re, os, sys
from multiprocessing import Process

def docker_command(command_words):
    return subprocess.Popen(
        ["docker"] + command_words,
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)

async def check_submission(websocket:object, submission:dict):
    exercise=submission["exercise"]
    with docker_command(["exec", "-w", "badkan", "grade_exercise", exercise]) as proc:
        for line in proc.stdout:
            print("> " + line)
            await websocket.send(line)

async def run(websocket, path):
    submission_json = await websocket.recv()   # returns a string
    submission = json.loads(submission_json)   # converts the string to a python dict

    ####
    await check_submission(websocket, submission)


websocketserver = websockets.server.serve(run, '0.0.0.0', 8888, origins=None)
asyncio.get_event_loop().run_until_complete(websocketserver)
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)

当一次只有一个用户时它工作得很好。但是,当多个用户尝试使用服务器时,服务器会串行处理它们,因此后面的用户必须等待很长时间。

我尝试将标有“####”(“await check_submission...”)的行替换为:

p = Process(target=check_submission, args=(websocket, submission,))
p.start()
Run Code Online (Sandbox Code Playgroud)

但是,它不起作用 - 我收到运行时警告:“协程:'check_submission'从未等待”,并且我没有看到通过 websocket 的任何输出。

我还尝试将这些行替换为:

loop = asyncio.get_event_loop()
loop.set_default_executor(ProcessPoolExecutor())
await loop.run_in_executor(None, check_submission, websocket, submission)
Run Code Online (Sandbox Code Playgroud)

但出现了不同的错误:“无法腌制 asyncio.Future 对象”。

如何构建这个多处理 websocket 服务器?

小智 10

这是我的例子,asyncio.run()为我工作,使用多进程启动异步函数

\n\n
class FlowConsumer(Base):\n    def __init__(self):\n        pass\n\n    async def run(self):\n        self.logger("start consumer process")\n        while True:\n            # get flow from queue\n            flow = {}\n            # call flow executor get result\n            executor = FlowExecutor(flow)\n            rtn = FlowResult()\n            try:\n                rtn = await executor.run()\n            except Exception as e:\n                self.logger("flow run except:{}".format(traceback.format_exc()))\n                rtn.status = FLOW_EXCEPT\n                rtn.msg = str(e)\n            self.logger("consumer flow finish\xef\xbc\x8cresult:{}".format(rtn.dict()))\n            time.sleep(1)\n\n    def process(self):\n        asyncio.run(self.run())\n\n\nprocesses = []\nconsumer_proc_count = 3\n\n# start multi consumer processes \nfor _ in range(consumer_proc_count):\n    # old version\n    # p = Process(target=FlowConsumer().run)\n    p = Process(target=FlowConsumer().process)\n    p.start()\n    processes.append(p)\n\nfor p in processes:\n    p.join()\n
Run Code Online (Sandbox Code Playgroud)\n