标签: rx-py

如何使用rxpython超时长时间运行的程序?

假设我有一个长期运行的python函数,看起来像这样?

import random
import time
from rx import Observable
def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x
Run Code Online (Sandbox Code Playgroud)

我希望能够设置超时1000ms.

所以我就像这样,通过上面的强烈计算创建一个可观察的并映射它.

a = Observable.repeat(1).map(lambda x: intns(x))
Run Code Online (Sandbox Code Playgroud)

现在,每个值发出的,如果超过1000毫秒我就越想尽快结束观察到,当我到达1000ms使用on_erroron_completed

a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))
Run Code Online (Sandbox Code Playgroud)

上面的语句确实得到超时和调用on_error,但它继续完成计算强烈的计算,然后才返回到下一个语句.有没有更好的方法呢?

最后一个语句打印以下内容

8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends
Run Code Online (Sandbox Code Playgroud)

这个想法是,如果一个特定的函数超时,我希望能够继续我的程序,并忽略结果.

我想知道 …

python reactive-programming system.reactive rx-py

14
推荐指数
1
解决办法
374
查看次数

订阅反应源的Python Web服务在对象中产生奇怪的行为

我使用Falcon实现了一个Web服务.此服务存储状态机(pytransitions),该状态机将传递给构造函数中的服务资源.该服务与gunicorn一起运行.

Web服务启动使用RxPy的过程.在其中返回的事件on_next(event)用于触发状态机中的转换.

错误

我希望状态机在服务和资源中都具有一致的状态,但似乎在资源中状态永远不会改变.

我们有一个试图重现这种行为的测试,但令人惊讶的是测试工作

class TochoLevel(object):

    def __init__(self, tochine):
        self.tochine = tochine

    def on_get(self, req, res):
        res.status = falcon.HTTP_200
        res.body = self.tochine.state


def get_machine():
    states = ["low", "medium", "high"]

    transitions = [
        {'trigger': 'to_medium', 'source': ['low', 'medium', 'high'], 'dest': 'medium'},
        {'trigger': 'to_high', 'source': ['low', 'medium', 'high'], 'dest': 'high'},
        {'trigger': 'to_low', 'source': ['low', 'medium', 'high'], 'dest': 'low'}
    ]

    locked_factory = MachineFactory.get_predefined(locked=True)

    return locked_factory(
        states=states,
        transitions=transitions,
        initial='low',
        auto_transitions=False,
        queued=False …
Run Code Online (Sandbox Code Playgroud)

python gunicorn falconframework rx-py

10
推荐指数
1
解决办法
248
查看次数

Python 中的数组解构

我希望vals最后一行更加清楚。

import rx
from rx import operators as op

light_stream = rx.range(1, 10).pipe(
            op.with_latest_from(irradiance_stream),
            op.map(lambda vals: print(vals[0], vals[1])))  # light, irradiance
Run Code Online (Sandbox Code Playgroud)

有没有像这样的数组解构之类的东西

op.map(lambda [light, irradiance]: print(light_intensity, irradiance))) # fake
Run Code Online (Sandbox Code Playgroud)

或者其他方式让代码清晰?谢谢

python lambda rx-py

9
推荐指数
1
解决办法
2万
查看次数

RxPy:在(慢)扫描执行之间排序热观察

TL; DR我正在寻求帮助来实现下面的大理石图.目的是尽可能地对未排序的值进行排序,而无需等待扫描执行之间的时间.

我不是要求全面实施.欢迎任何指导. 没消耗最小的大理石图 我有一个异步慢(强制测试目的)扫描无限热观察.这是相关代码:

thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()

external_obs \
    .flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
    .scan(seed=State(0, None), accumulator=slow_scan_msg) \
    .subscribe(log, print, lambda: print("SLOW FINISHED"))

external_obs.connect()
thread.start()

def slow_scan_msg(state, msg):
    sleep(0.4)
    return state \
        ._replace(count = state.count + 1) \
        ._replace(last_msg = msg)
Run Code Online (Sandbox Code Playgroud)

这是完整版:https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd

这是当前输出(值是随机生成的):

emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 …
Run Code Online (Sandbox Code Playgroud)

python reactive-programming observable rx-py reactivex

7
推荐指数
1
解决办法
311
查看次数

如何从诸如stdin之类的流中使rx.py Observable可观察到?

我正在努力使rxpy函数反应式编程(FRP)进入图书馆,而我已经遇到了障碍。我正在编写一个小程序,希望通过标准输入(sys.stdin)来流式传输数据。

因此,我的问题很简单:如何创建一个rx.Observable从stdin异步读取的实例?是否有内置机制可Observable从流创建实例?

python reactive-programming rx-py

5
推荐指数
1
解决办法
2454
查看次数

RxPY 中带有 from_iterable/range 的 subscribe_on

我正在尝试了解 python 反应式扩展的调度。我想用来subscribe_on并行处理多个可观察量。如果可观察量是使用 创建的,则效果很好,但如果使用just例如range或 ,则效果不佳。from_

just默认为Scheduler.immediate,而其他生成器默认为Scheduler.current_thread。这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


# Creates new thread (I like)
rx.Observable.just(3)\
    .do_action(work)\
    .subscribe_on(Scheduler.new_thread)\
    .subscribe(finish)

# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
    .do_action(work) \
    .subscribe_on(Scheduler.new_thread) \
    .subscribe(finish)
Run Code Online (Sandbox Code Playgroud)

它可以与调度程序一起使用observe_on,或者如果调度程序直接传递给生成器,但我想将可观察的创建与处理分离并实现如下所示: …

python scheduler reactive-programming rx-py

5
推荐指数
1
解决办法
2506
查看次数

当多个输入同时打勾时,如何防止 combine_latest 多次触发?

我正在使用Reactive Extensions 'combine_latest在任何输入勾选时执行操作。问题是如果多个输入蜱在同一时间然后combine_latest触发各个输入蜱后多次。这会引起头痛,因为combine_latest它实际上是用陈旧的值进行虚假滴答。

最小工作示例,其中fastobservable 每 10ms 滴答一次,而slowobservable 每 30ms 滴答一次:

from rx.concurrency import HistoricalScheduler
from rx import Observable
from __future__ import print_function

scheduler = HistoricalScheduler(initial_clock=1000)
fast = Observable.generate_with_relative_time(1, lambda x: True, lambda x: x + 1, lambda x: x, lambda x: 10, scheduler=scheduler)
slow = Observable.generate_with_relative_time(3, lambda x: True, lambda x: x + 3, lambda x: x, lambda x: 30, scheduler=scheduler)
Observable.combine_latest(fast, slow, lambda …
Run Code Online (Sandbox Code Playgroud)

python reactive-programming rx-py

5
推荐指数
1
解决办法
1902
查看次数

如何使用asyncio在单独的线程上通知RxPY观察者?

(注意:此问题的背景非常详细,但底部有一个SSCCE可以跳过)

背景

我正在尝试开发基于Python的CLI来与Web服务进行交互.在我的代码库中,我有一个CommunicationService类来处理与Web服务的所有直接通信.它公开了一个received_response属性,该属性返回Observable其他对象可以订阅的(来自RxPY),以便在从Web服务接收到响应时得到通知.

我将我的CLI逻辑基于click库,其中一个子命令实现如下:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
    self._generate_request(request)
    if response_handler is None:
        return None

    while True:
        response = await self.on_response
        success, value = response_handler(response)
        print(success, value)
        if success:
            return value
Run Code Online (Sandbox Code Playgroud)

这里发生的事情(在response_handler不是这种情况下None)是子命令表现为一个协程,它等待来自Web服务(self.on_response == CommunicationService.received_response)的响应,并从它可以处理的第一个响应中返回一些处理过的值.

我试图通过创建CommunicationService完全模拟的测试用例来测试CLI的行为; Subject创建一个假(可以作为一个假Observable)并被CommunicationService.received_response嘲笑返回它.作为测试的一部分,on_next调用主题的方法将模拟Web服务响应传递回生产代码:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    context.mock_received_response_subject.on_next(context.text)
Run Code Online (Sandbox Code Playgroud)

我使用单击'结果回调'函数,该函数在CLI调用结束时被调用并阻塞,直到coroutine(子命令)完成: …

python multithreading python-3.x python-asyncio rx-py

5
推荐指数
1
解决办法
1302
查看次数

如何等待 RxPy 并行线程完成

基于这个优秀的 SO 答案,我可以在 RxPy 中并行处理多个任务,我的问题是你如何等待它们全部完成?我知道我可以使用线程,.join()但是 Rx 调度器似乎没有任何这样的选项。.to_blocking()也无济于事, MainThread 在触发所有通知并调用完整处理程序之前完成。下面是一个例子:

from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread

def printthread(val):
    print("{}, thread: {}".format(val, current_thread().name))

def intense_calculation(value):
    printthread("calc {}".format(value))
    time.sleep(random.randint(5, 20) * .1)
    return value

if __name__ == "__main__":
    Observable.range(1, 3) \
        .select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
        .observe_on(Scheduler.event_loop) \
        .subscribe(
            on_next=lambda x: printthread("on_next: {}".format(x)),
            on_completed=lambda: printthread("on_completed"),
            on_error=lambda err: printthread("on_error: {}".format(err))) …
Run Code Online (Sandbox Code Playgroud)

python multithreading python-multithreading rx-py reactivex

5
推荐指数
2
解决办法
2627
查看次数

in ReactiveX, how do I pass other parameters to Observer.create?

Using RxPY for illustration purposes.

I want to create an observable from a function, but that function must take parameters. This particular example must return, at random intervals, one of many pre-defined tickers which I want to send to it. My solution thus far is to use a closure:

from __future__ import print_function

from rx import Observable
import random
import string
import time

def make_tickers(n = 300, s = 123):
    """ generates up to n unique 3-letter strings geach makde …
Run Code Online (Sandbox Code Playgroud)

python rx-py reactivex

5
推荐指数
1
解决办法
491
查看次数