如何从诸如stdin之类的流中使rx.py Observable可观察到?

blz*_*blz 5 python reactive-programming rx-py

我正在努力使rxpy函数反应式编程(FRP)进入图书馆,而我已经遇到了障碍。我正在编写一个小程序,希望通过标准输入(sys.stdin)来流式传输数据。

因此,我的问题很简单:如何创建一个rx.Observable从stdin异步读取的实例?是否有内置机制可Observable从流创建实例?

Dan*_*ter 4

我没用过RxPy,但是有点熟悉RxJS

RxPy许多内置方法可以用于此目的,但我倾向于创建一个 Observable 工厂。作为ObservableCreation.from_array我们的指导,让我们现在就尝试一下。(注意:我还没有运行此代码,但它应该可以帮助您完成大部分工作)

from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler

class ObservableFile(Observable, metaclass=ObservableMeta):

    @classmethod
    def from_file(cls, readableFile, scheduler=None):
        scheduler = scheduler or current_thread_scheduler

        def subscribe(observer):
            def action(action1, state=None):
                try:
                    observer.on_next(readableFile.next())
                    action1(action)

                except StopIteration: # EOF
                    observer.on_completed()

            return scheduler.schedule_recursive(action)
        return AnonymousObservable(subscribe)
Run Code Online (Sandbox Code Playgroud)

然后像这样使用它:

res = rx.Observable.from_file(sys.stdin)
Run Code Online (Sandbox Code Playgroud)

这将在 stdin 的每一行上创建一个可观察对象,直到 EOF。它是阻塞的,但有一些方法可以解决这个问题。它还可以使用不同的调度程序进行调整。