geo*_*111 6 python multithreading coroutine
我正在构建一个网络抓取 API,大部分抓取都是通过 AsyncIO 协程完成的,如下所示:
async def parse_event(self):
do scraping
# call the func
asyncio.run(b.parse_event())
Run Code Online (Sandbox Code Playgroud)
这工作得很好,但是当我同时抓取concurrent.futures.ThreadPoolExecutor多个网站时,我首先使用多个线程来抓取。但由于我已经实现了协程逻辑,所以我现在无法asyncio.run直接在线程中使用该方法。
之前(没有协程):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
w1_future = executor.submit(self.w1.parse_event)
w2_future = executor.submit(self.w2.parse_event)
w3_future = executor.submit(self.w3.parse_event)
Run Code Online (Sandbox Code Playgroud)
之后,我预计会出现如下所示的情况
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
w1_future = executor.submit(asyncio.run(self.w1.parse_event))
w2_future = executor.submit(asyncio.run(self.w2.parse_event))
w3_future = executor.submit(asyncio.run(self.w3.parse_event))
Run Code Online (Sandbox Code Playgroud)
不幸的是它不起作用。
Mis*_*agi 14
和asyncio线程都是使用单核进行并发操作的一种手段。然而,这通过不同的机制起作用:asyncio使用/的协作并发,而线程使用GIL的抢占式并发。
混合两者不会加快执行速度,因为两者仍然使用相同的单核;相反,两种机制的开销都会减慢程序速度,并且机制的交互会使编写正确的代码变得复杂。asyncawait
要实现多个任务之间的并发,请将它们全部提交到单个asyncio事件循环。相当于executor.submit是asyncio.create_task; 可以使用 一次提交多个任务asyncio.gather。请注意,两者都是在循环内部调用,而不是在执行器外部调用。
async def parse_all():
return await asyncio.gather(
# all the parsing tasks that should run concurrently
self.w1.parse_event,
self.w2.parse_event,
self.w3.parse_event,
)
asyncio.run(parse_all())
Run Code Online (Sandbox Code Playgroud)
如果您绝对希望为每个解析使用单独的线程事件循环,则必须使用executor.submit(func, *args)而不是executor.submit(func(args)).
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
w1_future = executor.submit(asyncio.run, self.w1.parse_event())
w2_future = executor.submit(asyncio.run, self.w2.parse_event())
w3_future = executor.submit(asyncio.run, self.w3.parse_event())
Run Code Online (Sandbox Code Playgroud)
请注意,混合asyncio和线程化增加了复杂性和约束。您可能希望使用调试模式来检测某些线程和上下文安全问题。然而,线程和上下文安全约束/保证通常没有记录;如果需要,手动测试或检查操作的安全性。