blz*_*blz 5 python reactive-programming rx-py
我正在努力使rxpy函数反应式编程(FRP)进入图书馆,而我已经遇到了障碍。我正在编写一个小程序,希望通过标准输入(sys.stdin)来流式传输数据。
因此,我的问题很简单:如何创建一个rx.Observable从stdin异步读取的实例?是否有内置机制可Observable从流创建实例?
我没用过RxPy,但是有点熟悉RxJS。
RxPy有许多内置方法可以用于此目的,但我倾向于创建一个 Observable 工厂。作为ObservableCreation.from_array我们的指导,让我们现在就尝试一下。(注意:我还没有运行此代码,但它应该可以帮助您完成大部分工作)
from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler
class ObservableFile(Observable, metaclass=ObservableMeta):
@classmethod
def from_file(cls, readableFile, scheduler=None):
scheduler = scheduler or current_thread_scheduler
def subscribe(observer):
def action(action1, state=None):
try:
observer.on_next(readableFile.next())
action1(action)
except StopIteration: # EOF
observer.on_completed()
return scheduler.schedule_recursive(action)
return AnonymousObservable(subscribe)
Run Code Online (Sandbox Code Playgroud)
然后像这样使用它:
res = rx.Observable.from_file(sys.stdin)
Run Code Online (Sandbox Code Playgroud)
这将在 stdin 的每一行上创建一个可观察对象,直到 EOF。它是阻塞的,但有一些方法可以解决这个问题。它还可以使用不同的调度程序进行调整。