将线程与 asyncio 结合使用

age*_*ith 7 python multithreading python-asyncio

我正在寻找一种方法来生成不同的线程(在我的实际程序中,线程数在执行过程中可以改变)来执行无限运行的操作,这会在运行期间阻塞我的整个应用程序(最坏的情况是)几秒钟。
因此,我使用标准线程类asyncio(因为我的程序的其他部分正在使用它)。

这似乎工作得很好,根据这个线程,它似乎没问题,但是当搜索异步线程asyncio时,我经常会偶然发现使用的建议ProcessPoolExecutor(例如在这个stackoverflow 帖子中)。 现在我想知道,以下方法是否真的是好的做法(甚至是危险的)?

class Scanner:
  def __init__(self):
    # Start a new Scanning Thread
    self.scan_thread = Thread(target=self.doScan, args=())
    self.scan_thread.start()

  def doScan(self):
    print("Started scanning")
    loop = asyncio.new_event_loop()
    loop.run_until_complete(self.connection())
    print("Stopped scanning")

list_of_scanner = []
list_of_scanner.append(Scanner())
list_of_scanner.append(Scanner())
Run Code Online (Sandbox Code Playgroud)

背景:我自己开始质疑这一点,因为我的程序在生成线程时开始崩溃,主要是出现错误消息 RuntimeError: Task <Task pending ...> attached to a different loop。我知道这与我给您的示例没有直接联系,但我想我开始通过使用这些线程搞乱我的异步协程。


编辑

为了澄清起见,我想补充一下,为什么我使用asyncioand的这种奇怪的构造threads

  1. 我正在使用项目hbldh/bleak
    的这一部分作为线程运行的部分基本上是这样的:
    async def connection():
      x = await client.is_connected()
      async with BleakClient(address, loop=loop) as client:
        while x:
          x = await client.is_connected()
          log.info("Connected: {0}".format(x))
    
    Run Code Online (Sandbox Code Playgroud)
  2. endlessScan()做什么?这个名称有点误导,并且在我的代码中被称为不同的(我现在已经更改了)。新名称是“connection()
    整个目的是建立到蓝牙设备的链接并基本上侦听传入的数据(就像我们使用套接字时所做的那样)”这意味着永远不会loop.run_until_complete(self.connection())退出,除非蓝牙设备断开连接。
  3. 为什么我无法创建一个事件循环?
    如前所述,当建立链接时,该函数会无限运行。每个连接的设备都运行这样的无限循环。我想在后台执行此操作。我的主应用程序永远不必等待例程完成,并且必须在所有情况下都能做出响应。threads对我来说,这证明了与以下组合的使用是合理的asyncio

编辑2:根据@user4815162342建议添加了我的测试代码。执行看起来效果很好。

    import asyncio
    from threading import Thread, Event, Lock
    import random
    class Scanner:
        def __init__(self, id, loop):
            print("INIT'D %s" % id)
            self.id = id
            self.submit_async(self.update_raw_data(), loop)
            self.raw_data = ""
            self.event = Event()
            self.data_lock = Lock()
    
        @property
        def raw_data(self):
            with self.data_lock:
                return self._raw_data
        @raw_data.setter
        def raw_data(self, raw_data):
            self._raw_data = raw_data
    
        def submit_async(self, awaitable, loop):
            return asyncio.run_coroutine_threadsafe(awaitable, loop)
    
        async def update_raw_data(self):
            while True:
                with self.data_lock:
                    self._raw_data = random.random()
                    print("Waken up %s with %s" % (self.id, self._raw_data))
                await asyncio.sleep(self.id)
    
    def _start_async():
        loop = asyncio.new_event_loop()
         t = Thread(target=loop.run_forever)
         t.daemon = True
         t.start()
        return loop
    _loop = _start_async()
    
    def stop_async():
        _loop.call_soon_threadsafe(_loop.stop)
    
    ble_devices = [Scanner(1, _loop), Scanner(2, _loop), Scanner(4, _loop)]
    
    # This code never executes...
    for dev in ble_devices:
        print(dev.raw_data)

Run Code Online (Sandbox Code Playgroud)

use*_*342 11

我建议在后台线程中创建一个事件循环并让它满足您的所有异步需求。你的协程永远不会结束并不重要;asyncio 完全能够并行执行多个此类函数。

例如:

def _start_async():
    loop = asyncio.new_event_loop()
    threading.Thread(target=loop.run_forever).start()
    return loop

_loop = start_async()

# Submits awaitable to the event loop, but *doesn't* wait for it to
# complete. Returns a concurrent.futures.Future which *may* be used to
# wait for and retrieve the result (or exception, if one was raised)
def submit_async(awaitable):
    return asyncio.run_coroutine_threadsafe(awaitable, _loop)

def stop_async():
    _loop.call_soon_threadsafe(_loop.stop)
Run Code Online (Sandbox Code Playgroud)

有了这些工具(并且可能在单独的模块中),您可以执行以下操作:

class Scanner:
    def __init__(self):
        submit_async(self.connection())
        # ...

    # ...
Run Code Online (Sandbox Code Playgroud)
  • 使用建议怎么样ProcessPoolExecutor

这些适用于在并行进程中运行 CPU 密集型代码以避免 GIL。如果您实际上正在运行异步代码,则不应该关心ProcessPoolExecutor.

  • 使用建议怎么样ThreadPoolExecutor

AThreadPoolExecutor只是一个对于经典多线程应用程序有用的线程池。在 Python 中,它主要用于使程序更具响应性,而不是使其更快。它允许您与交互式代码并行运行 CPU 密集型或阻塞代码,而不会出现饥饿问题。由于 GIL,它不会使事情变得更快。