标签: rx-py

rxpy 有效地组合可观察量

简介: 你好。我正在为我的用例探索 python rxpy库 - 我正在使用反应式编程概念构建执行管道。这样我希望我就不必操纵太多的状态。虽然我的解决方案似乎有效,但我在尝试从其他 Observable 组合一个新的 Observable 时遇到了麻烦。

问题是我编写可观察值的方式导致一些昂贵的计算重复两次。为了性能,我真的想防止触发昂贵的计算。

我对反应式编程很陌生。试图挠头并浏览互联网资源和参考文档 - 对我来说似乎有点太简洁了。请指教。

以下是一个玩具示例,说明了我正在做的事情:

import rx
from rx import operators as op
from rx.subject import Subject

root = Subject()

foo = root.pipe(
        op.map( lambda x : x + 1 ),
        op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r)))
    )

bar_foo = foo.pipe(
        op.map( lambda x : x * 2 ),
        op.do_action(lambda r: print("bar(foo(x)) = %s" % str(r)))
    )

bar_foo.pipe(
        op.zip(foo),
        op.map(lambda i: i[0]+i[1]),
        op.do_action(lambda r: print("foo(x) + bar(foo(x)) …
Run Code Online (Sandbox Code Playgroud)

python rx-py

5
推荐指数
1
解决办法
1116
查看次数

在0x1356e10>的functools.partial对象的<method-wrapper'__call__'不是Python函数

我正在尝试构建一个函数,我可以将其用作我正在映射的RxPy流的处理程序.我所需要的函数需要访问定义该变量的范围之外的变量,对我而言,这意味着我需要使用某种闭包.所以我到达functools.partial来关闭一个变量并返回一个部分函数,​​我可以作为观察者传递给我的流.

但是,这样做会导致以下结果:

Traceback (most recent call last):
  File "retry/example.py", line 46, in <module>
    response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
  File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/linq/observable/selectmany.py", line 67, in select_many
    selector = adapt_call(selector)
  File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/internal/utils.py", line 37, in adapt_call_1
    argnames, varargs, kwargs = getargspec(func)[:3]
  File "/usr/lib/python2.7/inspect.py", line 816, in getargspec
    raise TypeError('{!r} is not a Python function'.format(func))
TypeError: <method-wrapper '__call__' of functools.partial object at 0x2ce6cb0> is not a Python function
Run Code Online (Sandbox Code Playgroud)

以下是一些重现问题的示例代码:

from __future__ import absolute_import
from rx import Observable, Observer
from pykafka import KafkaClient
from pykafka.common import …
Run Code Online (Sandbox Code Playgroud)

python partial functools rx-py

4
推荐指数
1
解决办法
844
查看次数

如何使用可观察的 RxPY 间隔定期调用异步协程?

我需要创建一个 Observable 流,它定期发出异步协程的结果。

intervalRead是一个函数,它返回一个 Observable,并将间隔rate和异步协程函数作为参数fun,需要在定义的间隔内调用。

我的第一个方法是用interval factory方法创建一个observable,然后用map调用协程,用from_future把它包装成一个Observable,然后得到协程返回的值。

async def foo():
    await asyncio.sleep(1)
    return 42

def intervalRead(rate, fun) -> Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        map(lambda i: rx.from_future(loop.create_task(fun()))),
    )

async def main():
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next= lambda item: print(item)
    )

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

Run Code Online (Sandbox Code Playgroud)

然而我得到的输出不是协程的结果,而是 from_future 返回的 Observable,在指定的时间间隔发出

输出: <rx.core.observable.observable.Observable object at 0x033B5650>

我怎么能得到那个 Observable 返回的实际值?我希望 42

我的第二种方法是创建一个自定义的 observable:


def intervalRead(rate, fun) -> rx.Observable:
    interval = rx.interval(rate)
    def subs(observer: Observer, scheduler = None): …
Run Code Online (Sandbox Code Playgroud)

python python-asyncio rx-py reactivex

4
推荐指数
1
解决办法
2457
查看次数

Python在生产中从命令式转变为功能性

我感染了函数式编程和反应式方法.对于灵感和想法,我使用Haskell一篇很棒的Rick Hickey文章.在python世界中,我为自己找到了RxPyfuncy库.现在我有数千行命令式代码,我希望它能够实现功能.如果一切都简化了,我有一个数据库的getter和setter接口以及一个像状态机一样工作的内核.以下是伪代码的外观:

class State(object):
    def __init__(id):
        self.data = database_interface.get_state(id)
        self.status = data['status']

    def process(self):
         if self.status == 'init':
             self.handle_init()
         elif self.status == 'request_data':
             self.handle_request_data()
         elif self.status == 'idle':
             self.handle_idle()
         # etc...


...

def on_new_message(msg):
    id = msg['id']
    state = State(id)
    state.process()
Run Code Online (Sandbox Code Playgroud)

我有很多的iffor必要的业务逻辑,我的状态处理程序.我真的很尴尬,如何从当前模式转变为反应性和功能性.这里的一切都非常简单,但谁已经有类似的经历会理解我.我需要建议接下来要去哪里,从想法到实践,比功能风格的简单实用程序或简单的REST api更大.此外,无论我在哪里获得想法,我都会得到非常大的项目源代码的链接.感谢所有回复的人,将实际操作代码移植到功能代码中.是的,我知道它不会移植代码,而是从头开始重写代码.同样,我需要具有大量业务逻辑的项目示例,其中包含数据和数据突变的工作.无论如何,谢谢.

functional-programming reactive-programming python-3.x rx-py

2
推荐指数
1
解决办法
213
查看次数