简介: 你好。我正在为我的用例探索 python rxpy库 - 我正在使用反应式编程概念构建执行管道。这样我希望我就不必操纵太多的状态。虽然我的解决方案似乎有效,但我在尝试从其他 Observable 组合一个新的 Observable 时遇到了麻烦。
问题是我编写可观察值的方式导致一些昂贵的计算重复两次。为了性能,我真的想防止触发昂贵的计算。
我对反应式编程很陌生。试图挠头并浏览互联网资源和参考文档 - 对我来说似乎有点太简洁了。请指教。
以下是一个玩具示例,说明了我正在做的事情:
import rx
from rx import operators as op
from rx.subject import Subject
root = Subject()
foo = root.pipe(
op.map( lambda x : x + 1 ),
op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r)))
)
bar_foo = foo.pipe(
op.map( lambda x : x * 2 ),
op.do_action(lambda r: print("bar(foo(x)) = %s" % str(r)))
)
bar_foo.pipe(
op.zip(foo),
op.map(lambda i: i[0]+i[1]),
op.do_action(lambda r: print("foo(x) + bar(foo(x)) = %s" % str(r)))
).subscribe()
print("-------------")
root.on_next(10)
print("-------------")
Run Code Online (Sandbox Code Playgroud)
输出:
-------------
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x) = 11 (expensive)
foo(x) + bar(foo(x)) = 33
-------------
Run Code Online (Sandbox Code Playgroud)
您可能会想到foo()并且bar()是昂贵且复杂的操作。我首先构建一个可观察的foo. 然后编写一个bar_foo包含foo. 随后将两者压缩在一起计算最终结果foo(x)+bar(foo(x))。
问题:
我该如何防止foo()单个输入被多次触发?我有非常充分的理由保持foo()和bar()分离。另外我也不想明确地记忆foo()。
任何有在生产中使用 rxpy 经验的人都可以分享他们的经验。与同等的手工编写(但不可维护)代码相比,使用 rxpy 会带来更好的性能还是速度变慢?
op.share()在管道中进行昂贵的计算之后立即添加foo在这里可能很有用。因此将foo管道更改为:
foo = root.pipe(
op.map( lambda x : x + 1 ),
op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r))),
op.share() # added to pipeline
)
Run Code Online (Sandbox Code Playgroud)
将导致:
-------------
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x) + bar(foo(x)) = 33
-------------
Run Code Online (Sandbox Code Playgroud)
我相信这.share()使得昂贵操作的发出事件在下游订阅者之间共享,以便单个昂贵计算的结果可以多次使用。
关于你的第二个问题;我也是 RxPy 的新手,因此对更有经验的用户的答案很感兴趣。到目前为止,我注意到作为初学者,您可以轻松创建(糟糕的)管道,其中消息和计算在后台重复。.share()似乎在某种程度上减少了这种情况,但不确定后台发生了什么。
| 归档时间: |
|
| 查看次数: |
1116 次 |
| 最近记录: |