bru*_*oop 21 python python-3.x python-asyncio
我正在使用Python3 Asyncio模块来创建负载平衡应用程序.我有两个繁重的IO任务:
这两个进程将永远运行,彼此独立,不应被另一个进程阻塞.
我不能使用1个事件循环因为它们会阻塞彼此,有没有办法有2个事件循环或者我是否必须使用多线程/处理?
我尝试使用asyncio.new_event_loop(),但还没有设法让它工作.
bru*_*oop 22
回答我自己的问题发布我的解决方案:
我最终做的是在轮询内为轮询模块创建一个线程和一个新的事件循环,所以现在每个模块都在不同的循环中运行.它不是一个完美的解决方案,但它是唯一对我有意义的解决方案(我想避免线程,但因为它只有一个......).例:
import asyncio
import threading
def worker():
second_loop = asyncio.new_event_loop()
execute_polling_coroutines_forever(second_loop)
return
threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
loop = asyncio.get_event_loop()
execute_proxy_coroutines_forever(loop)
Run Code Online (Sandbox Code Playgroud)
Asyncio要求每个循环在同一个线程中运行其协同程序.使用这个方法你有一个事件循环foreach线程,它们是完全独立的:每个循环将在它自己的线程上执行它的协同程序,所以这不是问题.正如我所说,它可能不是最好的解决方案,但它对我有用.
cha*_*eng 22
尽管在大多数情况下,您在使用时不需要运行多个事件循环asyncio,但人们不应该假设他们的假设适用于所有情况,或者只是给您他们认为更好的东西而不直接针对您的原始问题。
以下演示了如何在线程中创建新的事件循环。与您自己的答案相比,该set_event_loop技巧可以帮助您避免loop每次执行基于异步的操作时都传递对象。
import asyncio
import threading
async def print_env_info_async():
# As you can see each work thread has its own asyncio event loop.
print(f"Thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}")
async def work():
while True:
await print_env_info_async()
await asyncio.sleep(1)
def worker():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop.run_until_complete(work())
return
number_of_threads = 2
for _ in range(number_of_threads):
threading.Thread(target=worker).start()
Run Code Online (Sandbox Code Playgroud)
理想情况下,您希望将繁重的工作放在工作线程中,并让 asncyio 线程尽可能轻地运行。将 asyncio 线程视为桌面或移动应用程序的 GUI 线程,您不想阻止它。工作线程通常非常繁忙,这是您不想在工作线程中创建单独的异步事件循环的原因之一。下面是如何使用单个异步事件循环管理繁重工作线程的示例。这是此类用例中最常见的做法:
import asyncio
import concurrent.futures
import threading
import time
def print_env_info(source_thread_id):
# This will be called in the main thread where the default asyncio event loop lives.
print(f"Thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}, source thread: {source_thread_id}")
def work(event_loop):
while True:
# The following line will fail because there's no asyncio event loop running in this worker thread.
# print(f"Thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}")
event_loop.call_soon_threadsafe(print_env_info, threading.get_ident())
time.sleep(1)
async def worker():
print(f"Thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}")
loop = asyncio.get_running_loop()
number_of_threads = 2
executor = concurrent.futures.ThreadPoolExecutor(max_workers=number_of_threads)
for _ in range(number_of_threads):
asyncio.ensure_future(loop.run_in_executor(executor, work, loop))
loop = asyncio.get_event_loop()
loop.create_task(worker())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
kis*_*rgy 13
整个要点asyncio是,您可以同时运行数千个I / O繁重的任务,因此根本不需要Threads,这正是要这样做的原因asyncio。只需在同一循环中运行两个协程(SNMP和代理)即可。您必须在调用之前使它们都可用于事件循环loop.run_forever()。像这样:
import asyncio
async def snmp():
print("Doing the snmp thing")
await asyncio.sleep(1)
async def proxy():
print("Doing the proxy thing")
await asyncio.sleep(2)
async def main():
while True:
await snmp()
await proxy()
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
我不知道您的代码的结构,因此不同的模块可能具有自己的无限循环或某种东西,在这种情况下,您可以运行如下代码:
import asyncio
async def snmp():
while True:
print("Doing the snmp thing")
await asyncio.sleep(1)
async def proxy():
while True:
print("Doing the proxy thing")
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
loop.create_task(snmp())
loop.create_task(proxy())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
请记住,两者snmp和proxy都必须async def是以异步感知的方式编写的协程()。asyncio不会使简单的阻止Python函数突然“异步”。
在您的特定情况下,我怀疑您有点困惑(没有冒犯!),因为编写良好的异步模块永远不会在同一循环中互相阻塞。如果是这种情况,则根本不需要asyncio,只需要单独运行其中一个即可,Thread而无需处理任何asyncio东西。
I know it's an old thread but it might be still helpful for someone. I'm not good in asyncio but here is a bit improved solution of @kissgyorgy answer. Instead of awaiting each closure separately we create list of tasks and fire them later (python 3.9):
import asyncio
async def snmp():
while True:
print("Doing the snmp thing")
await asyncio.sleep(0.4)
async def proxy():
while True:
print("Doing the proxy thing")
await asyncio.sleep(2)
async def main():
tasks = []
tasks.append(asyncio.create_task(snmp()))
tasks.append(asyncio.create_task(proxy()))
await asyncio.gather(*tasks)
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
Result:
Doing the snmp thing
Doing the proxy thing
Doing the snmp thing
Doing the snmp thing
Doing the snmp thing
Doing the snmp thing
Doing the proxy thing
Run Code Online (Sandbox Code Playgroud)
Asyncio 事件循环是一个单线程运行,它不会并行运行任何东西,这就是它的设计方式。我能想到的最接近的事情是使用asyncio.wait.
from asyncio import coroutine
import asyncio
@coroutine
def some_work(x, y):
print("Going to do some heavy work")
yield from asyncio.sleep(1.0)
print(x + y)
@coroutine
def some_other_work(x, y):
print("Going to do some other heavy work")
yield from asyncio.sleep(3.0)
print(x * y)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([asyncio.async(some_work(3, 4)),
asyncio.async(some_other_work(3, 4))]))
loop.close()
Run Code Online (Sandbox Code Playgroud)
另一种方法是使用asyncio.gather()- 它从给定的 future 列表中返回未来的结果。
tasks = [asyncio.Task(some_work(3, 4)), asyncio.Task(some_other_work(3, 4))]
loop.run_until_complete(asyncio.gather(*tasks))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
16916 次 |
| 最近记录: |