zeh*_*zeh 7 python-multithreading python-3.x async-await python-asyncio
我想运行一个使用协同程序和多线程请求URL的服务.但是我无法将协同程序传递给执行程序中的工作程序.有关此问题的最小示例,请参阅下面的代码:
import time
import asyncio
import concurrent.futures
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=5)
async def async_request(loop):
await asyncio.sleep(3)
def sync_request(_):
time.sleep(3)
async def main(loop):
futures = [loop.run_in_executor(EXECUTOR, async_request,loop)
for x in range(10)]
await asyncio.wait(futures)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
Run Code Online (Sandbox Code Playgroud)
导致以下错误:
Traceback (most recent call last):
File "co_test.py", line 17, in <module>
loop.run_until_complete(main(loop))
File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "co_test.py", line 10, in main
futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
File "co_test.py", line 10, in <listcomp>
futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
File "/usr/lib/python3.5/asyncio/base_events.py", line 541, in run_in_executor
raise TypeError("coroutines cannot be used with run_in_executor()")
TypeError: coroutines cannot be used with run_in_executor()
Run Code Online (Sandbox Code Playgroud)
我知道我可以使用sync_requestfuncion代替async_request,在这种情况下,我会通过将阻塞函数发送到另一个线程来获得协同程序.
我也知道我可以async_request在事件循环中调用十次.类似下面的代码:
loop = asyncio.get_event_loop()
futures = [async_request(loop) for i in range(10)]
loop.run_until_complete(asyncio.wait(futures))
Run Code Online (Sandbox Code Playgroud)
但在这种情况下,我将使用单个线程.
我怎么能使用两个场景,在多线程中工作的协同程序?正如你可以在代码中看到的,我传递(而不是使用)的pool到async_request的,他希望我能代码的东西,告诉工人做一个未来,将其发送到游泳池和异步(解放了工人)等待结果.
我想这样做的原因是使应用程序可扩展.这是不必要的一步吗?我应该只是每个网址都有一个帖子,那就是它吗?就像是:
LEN = len(list_of_urls)
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=LEN)
Run Code Online (Sandbox Code Playgroud)
够好吗?
您必须在线程上下文中创建并设置一个新的事件循环才能运行协同程序:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def run(corofn, *args):
loop = asyncio.new_event_loop()
try:
coro = corofn(*args)
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
finally:
loop.close()
async def main():
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=5)
futures = [
loop.run_in_executor(executor, run, asyncio.sleep, 1, x)
for x in range(10)]
print(await asyncio.gather(*futures))
# Prints: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
根据我对问题的理解,您试图使用每个线程来:
但是,一旦调用循环(无论是主循环还是新循环)来等待结果,它就会阻塞等待的线程。
而且,通过将 run_in_executor 与一堆同步函数一起使用,线程实际上并不知道在到达等待循环的点之前是否有更多协程要一次性分派。
我认为,如果您想以这样的方式分派一堆协程,以便每个线程在自己的事件循环中管理自己的协程组,则以下代码实现了 1 秒的总时间,多线程等待 10 个异步睡眠第二。
import asyncio
import threading
from asyncio import AbstractEventLoop
from concurrent.futures import ThreadPoolExecutor
from time import perf_counter
from typing import Dict, Set
import _asyncio
event_loops_for_each_thread: Dict[int, AbstractEventLoop] = {}
def run(corofn, *args):
curr_thread_id = threading.current_thread().ident
if curr_thread_id not in event_loops_for_each_thread:
event_loops_for_each_thread[curr_thread_id] = asyncio.new_event_loop()
thread_loop = event_loops_for_each_thread[curr_thread_id]
coro = corofn(*args)
return thread_loop.create_task(coro)
async def async_gather_tasks(all_tasks: Set[_asyncio.Task]):
return await asyncio.gather(*all_tasks)
def wait_loops():
# each thread will block waiting all async calls of its specific async loop
curr_thread_id = threading.current_thread().ident
threads_event_loop = event_loops_for_each_thread[curr_thread_id]
# I print the following to prove that each thread is waiting its loop
print(f'Thread {curr_thread_id} will wait its tasks.')
return threads_event_loop.run_until_complete(async_gather_tasks(asyncio.all_tasks(threads_event_loop)))
async def main():
loop = asyncio.get_event_loop()
max_workers = 5
executor = ThreadPoolExecutor(max_workers=max_workers)
# dispatching async tasks for each thread.
futures = [
loop.run_in_executor(executor, run, asyncio.sleep, 1, x)
for x in range(10)]
# waiting the threads finish dispatching the async executions to its own event loops
await asyncio.wait(futures)
# at this point the async events were dispatched to each thread event loop
# in the lines below, you tell each worker thread to wait all its async tasks completion.
futures = [
loop.run_in_executor(executor, wait_loops)
for _ in range(max_workers)
]
print(await asyncio.gather(*futures))
# it will print something like:
# [[1, 8], [0], [6, 3, 9, 7], [4], [2, 5]]
# each sub-set is the result of the tasks of a thread
# it is non-deterministic, so it will return a diferent array of arrays each time you run.
if __name__ == '__main__':
loop = asyncio.get_event_loop()
start = perf_counter()
loop.run_until_complete(main())
end = perf_counter()
duration_s = end - start
# the print below proves that all threads are waiting its tasks asynchronously
print(f'duration_s={duration_s:.3f}')
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8574 次 |
| 最近记录: |