假设我有一个长期运行的python函数,看起来像这样?
import random
import time
from rx import Observable
def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
time.sleep(y)
print('end')
return x
Run Code Online (Sandbox Code Playgroud)
我希望能够设置超时1000ms.
所以我就像这样,通过上面的强烈计算创建一个可观察的并映射它.
a = Observable.repeat(1).map(lambda x: intns(x))
Run Code Online (Sandbox Code Playgroud)
现在,每个值发出的,如果超过1000毫秒我就越想尽快结束观察到,当我到达1000ms使用on_error或on_completed
a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))
Run Code Online (Sandbox Code Playgroud)
上面的语句确实得到超时和调用on_error,但它继续完成计算强烈的计算,然后才返回到下一个语句.有没有更好的方法呢?
最后一个语句打印以下内容
8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends
Run Code Online (Sandbox Code Playgroud)
这个想法是,如果一个特定的函数超时,我希望能够继续我的程序,并忽略结果.
我想知道 …
我使用Falcon实现了一个Web服务.此服务存储状态机(pytransitions),该状态机将传递给构造函数中的服务资源.该服务与gunicorn一起运行.
Web服务启动使用RxPy的过程.在其中返回的事件on_next(event)用于触发状态机中的转换.
错误
我希望状态机在服务和资源中都具有一致的状态,但似乎在资源中状态永远不会改变.
我们有一个试图重现这种行为的测试,但令人惊讶的是测试工作
class TochoLevel(object):
def __init__(self, tochine):
self.tochine = tochine
def on_get(self, req, res):
res.status = falcon.HTTP_200
res.body = self.tochine.state
def get_machine():
states = ["low", "medium", "high"]
transitions = [
{'trigger': 'to_medium', 'source': ['low', 'medium', 'high'], 'dest': 'medium'},
{'trigger': 'to_high', 'source': ['low', 'medium', 'high'], 'dest': 'high'},
{'trigger': 'to_low', 'source': ['low', 'medium', 'high'], 'dest': 'low'}
]
locked_factory = MachineFactory.get_predefined(locked=True)
return locked_factory(
states=states,
transitions=transitions,
initial='low',
auto_transitions=False,
queued=False …Run Code Online (Sandbox Code Playgroud) 我希望vals最后一行更加清楚。
import rx
from rx import operators as op
light_stream = rx.range(1, 10).pipe(
op.with_latest_from(irradiance_stream),
op.map(lambda vals: print(vals[0], vals[1]))) # light, irradiance
Run Code Online (Sandbox Code Playgroud)
有没有像这样的数组解构之类的东西
op.map(lambda [light, irradiance]: print(light_intensity, irradiance))) # fake
Run Code Online (Sandbox Code Playgroud)
或者其他方式让代码清晰?谢谢
TL; DR我正在寻求帮助来实现下面的大理石图.目的是尽可能地对未排序的值进行排序,而无需等待扫描执行之间的时间.
我不是要求全面实施.欢迎任何指导.
我有一个异步慢(强制测试目的)扫描无限热观察.这是相关代码:
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
def slow_scan_msg(state, msg):
sleep(0.4)
return state \
._replace(count = state.count + 1) \
._replace(last_msg = msg)
Run Code Online (Sandbox Code Playgroud)
这是完整版:https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd
这是当前输出(值是随机生成的):
emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 …Run Code Online (Sandbox Code Playgroud) 我正在努力使rxpy函数反应式编程(FRP)进入图书馆,而我已经遇到了障碍。我正在编写一个小程序,希望通过标准输入(sys.stdin)来流式传输数据。
因此,我的问题很简单:如何创建一个rx.Observable从stdin异步读取的实例?是否有内置机制可Observable从流创建实例?
我正在尝试了解 python 反应式扩展的调度。我想用来subscribe_on并行处理多个可观察量。如果可观察量是使用 创建的,则效果很好,但如果使用just例如range或 ,则效果不佳。from_
just默认为Scheduler.immediate,而其他生成器默认为Scheduler.current_thread。这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
# Creates new thread (I like)
rx.Observable.just(3)\
.do_action(work)\
.subscribe_on(Scheduler.new_thread)\
.subscribe(finish)
# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
.do_action(work) \
.subscribe_on(Scheduler.new_thread) \
.subscribe(finish)
Run Code Online (Sandbox Code Playgroud)
它可以与调度程序一起使用observe_on,或者如果调度程序直接传递给生成器,但我想将可观察的创建与处理分离并实现如下所示: …
我正在使用Reactive Extensions 'combine_latest在任何输入勾选时执行操作。问题是如果多个输入蜱在同一时间然后combine_latest触发各个输入蜱后多次。这会引起头痛,因为combine_latest它实际上是用陈旧的值进行虚假滴答。
最小工作示例,其中fastobservable 每 10ms 滴答一次,而slowobservable 每 30ms 滴答一次:
from rx.concurrency import HistoricalScheduler
from rx import Observable
from __future__ import print_function
scheduler = HistoricalScheduler(initial_clock=1000)
fast = Observable.generate_with_relative_time(1, lambda x: True, lambda x: x + 1, lambda x: x, lambda x: 10, scheduler=scheduler)
slow = Observable.generate_with_relative_time(3, lambda x: True, lambda x: x + 3, lambda x: x, lambda x: 30, scheduler=scheduler)
Observable.combine_latest(fast, slow, lambda …Run Code Online (Sandbox Code Playgroud) (注意:此问题的背景非常详细,但底部有一个SSCCE可以跳过)
我正在尝试开发基于Python的CLI来与Web服务进行交互.在我的代码库中,我有一个CommunicationService类来处理与Web服务的所有直接通信.它公开了一个received_response属性,该属性返回Observable其他对象可以订阅的(来自RxPY),以便在从Web服务接收到响应时得到通知.
我将我的CLI逻辑基于click库,其中一个子命令实现如下:
async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
self._generate_request(request)
if response_handler is None:
return None
while True:
response = await self.on_response
success, value = response_handler(response)
print(success, value)
if success:
return value
Run Code Online (Sandbox Code Playgroud)
这里发生的事情(在response_handler不是这种情况下None)是子命令表现为一个协程,它等待来自Web服务(self.on_response == CommunicationService.received_response)的响应,并从它可以处理的第一个响应中返回一些处理过的值.
我试图通过创建CommunicationService完全模拟的测试用例来测试CLI的行为; Subject创建一个假(可以作为一个假Observable)并被CommunicationService.received_response嘲笑返回它.作为测试的一部分,on_next调用主题的方法将模拟Web服务响应传递回生产代码:
@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
context.mock_received_response_subject.on_next(context.text)
Run Code Online (Sandbox Code Playgroud)
我使用单击'结果回调'函数,该函数在CLI调用结束时被调用并阻塞,直到coroutine(子命令)完成: …
基于这个优秀的 SO 答案,我可以在 RxPy 中并行处理多个任务,我的问题是你如何等待它们全部完成?我知道我可以使用线程,.join()但是 Rx 调度器似乎没有任何这样的选项。.to_blocking()也无济于事, MainThread 在触发所有通知并调用完整处理程序之前完成。下面是一个例子:
from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
def intense_calculation(value):
printthread("calc {}".format(value))
time.sleep(random.randint(5, 20) * .1)
return value
if __name__ == "__main__":
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(
on_next=lambda x: printthread("on_next: {}".format(x)),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err))) …Run Code Online (Sandbox Code Playgroud) Using RxPY for illustration purposes.
I want to create an observable from a function, but that function must take parameters. This particular example must return, at random intervals, one of many pre-defined tickers which I want to send to it. My solution thus far is to use a closure:
from __future__ import print_function
from rx import Observable
import random
import string
import time
def make_tickers(n = 300, s = 123):
""" generates up to n unique 3-letter strings geach makde …Run Code Online (Sandbox Code Playgroud) python ×10
rx-py ×10
reactivex ×3
gunicorn ×1
lambda ×1
observable ×1
python-3.x ×1
scheduler ×1