Bob*_*Bob 14 c python coroutine async-await micropython
环境:C和micropython虚拟机中的协作RTOS是其中的任务之一.
为了使虚拟机不会阻止其它RTOS的任务,我插入RTOS_sleep()的vm.c:DISPATCH(),这样,每次执行字节码后,虚拟机放弃控制到下一个RTOS任务.
我创建了一个uPy接口,使用生产者 - 消费者设计模式从物理数据总线异步获取数据 - 可以是CAN,SPI,以太网.
在uPy中的用法:
can_q = CANbus.queue()
message = can_q.get()
Run Code Online (Sandbox Code Playgroud)
C中的实现can_q.get()不会阻止RTOS:它轮询C队列,如果没有收到消息,它会调用RTOS_sleep()给另一个任务填充队列的机会.事物是同步的,因为C队列仅由另一个RTOS任务更新,而RTOS任务仅在RTOS_sleep()被调用时切换,即协作
C实现基本上是:
// gives chance for c-queue to be filled by other RTOS task
while(c_queue_empty() == true) RTOS_sleep();
return c_queue_get_message();
Run Code Online (Sandbox Code Playgroud)
尽管Python语句can_q.get()不会阻止RTOS,但它会阻止uPy脚本.我想重写它,所以我可以使用async defie coroutine并让它不阻止uPy脚本.
不确定语法,但这样的事情:
can_q = CANbus.queue()
message = await can_q.get()
Run Code Online (Sandbox Code Playgroud)
题
如何编写C函数以便我可以await使用它?
我更喜欢CPython和micropython的答案,但我会接受仅限CPython的答案.
use*_*342 16
注意:这个答案涵盖了CPython和asyncio框架.但是,这些概念应该适用于其他Python实现以及其他异步框架.
如何编写C函数以便我可以
await使用它?
编写可以等待结果的C函数的最简单方法是让它返回一个已经等待的对象,例如asyncio.Future.在返回之前Future,代码必须安排将来的结果由某些异步机制设置.所有这些基于协程的方法都假定您的程序在一些知道如何安排协同程序的事件循环下运行.
但是回归未来并不总是足够 - 也许我们想要定义一个具有任意数量的悬挂点的对象.返回一个未来只暂停一次(如果返回的未来未完成),一旦未来完成就恢复,就是这样.等同于async def包含多个对象的等待对象await无法通过返回未来来实现,它必须实现协同程序通常实现的协议.这有点像实现自定义的迭代器__next__,而不是生成器.
为了定义我们自己的等待类型,我们可以转向PEP 492,它精确地指定了可以传递给哪些对象await.除了定义的Python函数之外async def,用户定义的类型可以通过定义__await__Python/C映射到结构tp_as_async.am_await部分的特殊方法来使对象等待PyTypeObject.
这意味着在Python/C中,您必须执行以下操作:
tp_as_async扩展类型的字段指定非NULL值.am_await成员指向一个接受你的类型实例的C函数,并返回另一个实现迭代器协议的扩展类型的实例,即定义tp_iter(通常定义为PyIter_Self)和tp_iternext.tp_iternext必须推进协程的状态机.每个非特殊回报都tp_iternext对应一个暂停,最后一个StopIteration异常表示从协同程序的最终返回.返回值存储在value属性中StopIteration.为了使协程有用,它还必须能够与驱动它的事件循环通信,以便它可以指定何时在它被挂起后恢复.asyncio定义的大多数协同程序都希望在asyncio事件循环下运行,并在内部使用asyncio.get_event_loop()(和/或接受显式loop参数)来获取其服务.
为了说明Python/C代码需要实现什么,让我们考虑用Python表示的简单协程async def,例如这相当于asyncio.sleep():
async def my_sleep(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
await future
# we get back here after the timeout has elapsed, and
# immediately return
Run Code Online (Sandbox Code Playgroud)
my_sleep创建一个Future,安排它在n秒内完成(其结果设置),并暂停直到将来完成.最后一部分使用await,await x意思是"允许x决定我们现在是暂停还是继续执行".一个不完整的未来总是决定暂停,并且asyncio Task协程驱动程序特殊情况产生了期货无限期暂停它们并连接它们的完成以恢复任务.其他事件循环(curio等)的暂停机制可能在细节上有所不同,但基本思想是相同的:await是可选的执行暂停.
__await__() 返回一个生成器要将其转换为C,我们必须摆脱魔术async def函数定义以及await暂停点.删除async def它非常简单:等效的普通函数只需要返回一个实现的对象__await__:
def my_sleep(n):
return _MySleep(n)
class _MySleep:
def __init__(self, n):
self.n = n
def __await__(self):
return _MySleepIter(self.n)
Run Code Online (Sandbox Code Playgroud)
返回__await__的_MySleep对象的方法my_sleep()将由await操作符自动调用,以将等待对象(传递给它的任何对象await)转换为迭代器.此迭代器将用于询问等待的对象是选择暂停还是提供值.这很像for o in x语句调用x.__iter__()将iterable 转换x为具体迭代器的方式.
当返回的迭代器选择暂停时,它只需要生成一个值.值的含义(如果有的话)将由协程驱动程序解释,通常是事件循环的一部分.当迭代器选择停止执行并返回时await,它需要停止迭代.使用生成器作为便利迭代器实现,_MySleepIter如下所示:
def _MySleepIter(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
# yield from future.__await__()
for x in future.__await__():
yield x
Run Code Online (Sandbox Code Playgroud)
作为await x映射yield from x.__await__(),我们的生成器必须耗尽返回的迭代器future.__await__().返回的迭代器Future.__await__将在未来不完整时生成,并返回未来的结果(我们在此忽略,但yield from实际上提供)否则.
__await__() 返回自定义迭代器在C中实现C的最后一个障碍my_sleep是使用生成器_MySleepIter.幸运的是,任何生成器都可以转换为有状态迭代器,它会__next__执行下一个等待或返回的代码.__next__实现生成器代码的状态机版本,其中yield通过返回值和return通过提升来表示StopIteration.例如:
class _MySleepIter:
def __init__(self, n):
self.n = n
self.state = 0
def __iter__(self): # an iterator has to define __iter__
return self
def __next__(self):
if self.state == 0:
loop = asyncio.get_event_loop()
self.future = loop.create_future()
loop.call_later(self.n, self.future.set_result, None)
self.state = 1
if self.state == 1:
if not self.future.done():
return next(iter(self.future))
self.state = 2
if self.state == 2:
raise StopIteration
raise AssertionError("invalid state")
Run Code Online (Sandbox Code Playgroud)
以上是一些打字,但它可以工作,并且只使用可以使用本机Python/C函数定义的构造.
实际上将这两个类翻译成C非常简单,但超出了这个答案的范围.