小编tom*_*ssa的帖子

如何使用可观察的 RxPY 间隔定期调用异步协程?

我需要创建一个 Observable 流,它定期发出异步协程的结果。

intervalRead是一个函数,它返回一个 Observable,并将间隔rate和异步协程函数作为参数fun,需要在定义的间隔内调用。

我的第一个方法是用interval factory方法创建一个observable,然后用map调用协程,用from_future把它包装成一个Observable,然后得到协程返回的值。

async def foo():
    await asyncio.sleep(1)
    return 42

def intervalRead(rate, fun) -> Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        map(lambda i: rx.from_future(loop.create_task(fun()))),
    )

async def main():
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next= lambda item: print(item)
    )

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

Run Code Online (Sandbox Code Playgroud)

然而我得到的输出不是协程的结果,而是 from_future 返回的 Observable,在指定的时间间隔发出

输出: <rx.core.observable.observable.Observable object at 0x033B5650>

我怎么能得到那个 Observable 返回的实际值?我希望 42

我的第二种方法是创建一个自定义的 observable:


def intervalRead(rate, fun) -> rx.Observable:
    interval = rx.interval(rate)
    def subs(observer: Observer, scheduler = None): …
Run Code Online (Sandbox Code Playgroud)

python python-asyncio rx-py reactivex

4
推荐指数
1
解决办法
2457
查看次数

标签 统计

python ×1

python-asyncio ×1

reactivex ×1

rx-py ×1