标签: reactivex

RxJava:如果最后一个任务完成,如何每5秒执行一次任务

我正在使用RxJava.我必须每5秒执行一次特定的任务.通过使用"Observable.Interval"方法完美地工作.但是,我还有以下约束:如果最后一个任务没有完成,则不能执行新任务.在这种情况下,新任务只有在最后一个完成时才需要执行.

我无法弄清楚如何使用RxJava做到这一点.

所有的想法都会非常感激^^

谢谢阅读.

reactive-programming rx-java rx-android reactivex

8
推荐指数
2
解决办法
4819
查看次数

如何将对象传递给 rxjs subscribe() 回调函数?

我正在开发一个 Ionic2 应用程序,使用 cordova-plugin-network-information,我从我的 app.ts 订阅连接和断开连接事件,并希望能够将对我的 NavController 的引用和加载组件传递到订阅中() 回调,因此每当断开连接事件触发时,我都可以在 UI 顶部向用户显示正在加载的叠加层。我看到在回调中对“this”对象的引用更改为名为“SafeSubscriber”的对象,我认为这是其 Observer 对象的 rxjs 类型类,我这里遇到的问题是我无法获取这些对象使用 Chrome DevTools 将 app.ts 中可用的实例添加到回调内的此代码中,我也无法找到离开此上下文的方法来访问 App 对象本身。

这是我的代码:

    ngOnInit()
    {
      // Nav Controller:
      this.nav = <NavController>this.app.getComponent('nav');

      // Disconnect Detection
      let disconnectSubscription = Network.onDisconnect().subscribe(() =>
      {
          console.log('Disconnect Detected');
          // Push Loading into nav stack here...!
          // this.nav.present(this.loading);  <- no this object...
      });
    }
Run Code Online (Sandbox Code Playgroud)

这是我在 Chrome DevTools 中查询“this”对象时得到的结果(这应该将其原始上下文保留在 lambda [粗箭头] 函数中,这是正确的吗?)

在此输入图像描述

我尝试在订阅之前设置一个“that”对象,以便变量“this”不会干扰回调“this”范围,它在这种情况下不起作用,因为“that”是立即声明的当触发断开连接事件时,在 subscribe() (假设:any = this;)在回调内部未定义之前。

我知道这不是放置直接更改 UI 的代码的最佳位置,但我没有看到其他地方,因为我这里需要的是一个全局事件处理程序,只要没有检测到连接且用户没有连接,该处理程序就通过设置此覆盖来工作正在查看应用程序内的某些页面。

我认为应该有一个非常简单和优雅的解决方案,但我似乎无法找到它。有没有办法将参数传递给 subscribe() 函数?某种带有我需要的引用的对象?

提前致谢。

rxjs reactivex ionic2 angular

7
推荐指数
1
解决办法
4333
查看次数

ReactiveX:不会破坏observable的错误处理

它不清楚如何在REactiveX中向订阅者传播错误,以便Observable不会被破坏.

observable.onNext(1);
observable.onNext(2);
observable.onError("Nope");
observable.onNext(3);<<won't work.
Run Code Online (Sandbox Code Playgroud)

我接受这个限制,但是我仍然有一个场景,我希望下游的听众知道发生了错误,我不希望observable死掉.

这个用例的主要用例是UI代码,如果出现错误,我不希望对之前注册的所有可观察对象调用"Setup".

可能的选择是

a)推送具有数据字段和错误字段的自定义对象

class Data
{
    int value;
    Error * error;  
}
Run Code Online (Sandbox Code Playgroud)

我不喜欢这个解决方案

b)有两个流.一个用于数据,一个用于错误.

observable.onNext(1);
observable.onNext(2);
errorObservable.onNext("Error");
observable.onNext(3);
Run Code Online (Sandbox Code Playgroud)

这方面最常见的做法是什么?

reactivex

7
推荐指数
1
解决办法
470
查看次数

7
推荐指数
1
解决办法
2140
查看次数

在反应式编程中循环之间的依赖关系

在涉及反应式编程时,我经常会遇到两个流相互依赖的情况.解决这些案件的惯用方法是什么?

一个最小的例子:有按钮A和B,都显示一个值.单击A必须将A的值增加B.单击B必须将B的值设置为A.

我可以提出第一个解决方案(例如在F#中,但欢迎任何语言的答案):

let solution1 buttonA buttonB =
    let mutable lastA = 0
    let mutable lastB = 1
    let a = new Subject<_> ()
    let b = new Subject<_> ()
    (OnClick buttonA).Subscribe(fun _ -> lastA <- lastA + lastB; a.OnNext lastA) 
    (OnClick buttonB).Subscribe(fun _ -> lastB <- lastA; b.OnNext lastB)
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonA)
    a.OnNext 0
    b.OnNext 1
Run Code Online (Sandbox Code Playgroud)

这个解决方案使用可变状态和主题,它不是非常易读,也不像惯用语.

我尝试的第二个解决方案涉及创建一个将两个相关流链接在一起的方法:

let dependency (aGivenB: IObservable<_> -> IObservable<_>) (bGivenA: IObservable<_> -> IObservable<_>) =
    let bProxy = new ReplaySubject<_> () 
    let a = …
Run Code Online (Sandbox Code Playgroud)

c# f# reactive-programming system.reactive reactivex

7
推荐指数
1
解决办法
170
查看次数

RxJava2中的Observable#asObservable()在哪里?

我想观察一下BehaviourSubject.在RxJava 1中,我调用了asObservable(),现在已经不见了.

我找到publish()但它返回可连接,我不想这样做.

如何将行为主题转换为RxJava 2中的可观察对象?

java reactivex

7
推荐指数
2
解决办法
2456
查看次数

RxPy:在(慢)扫描执行之间排序热观察

TL; DR我正在寻求帮助来实现下面的大理石图.目的是尽可能地对未排序的值进行排序,而无需等待扫描执行之间的时间.

我不是要求全面实施.欢迎任何指导. 没消耗最小的大理石图 我有一个异步慢(强制测试目的)扫描无限热观察.这是相关代码:

thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()

external_obs \
    .flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
    .scan(seed=State(0, None), accumulator=slow_scan_msg) \
    .subscribe(log, print, lambda: print("SLOW FINISHED"))

external_obs.connect()
thread.start()

def slow_scan_msg(state, msg):
    sleep(0.4)
    return state \
        ._replace(count = state.count + 1) \
        ._replace(last_msg = msg)
Run Code Online (Sandbox Code Playgroud)

这是完整版:https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd

这是当前输出(值是随机生成的):

emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 …
Run Code Online (Sandbox Code Playgroud)

python reactive-programming observable rx-py reactivex

7
推荐指数
1
解决办法
311
查看次数

我什么时候应该使用blockingGet?

我在工作中经常使用 RxJava,并且看到了一些调用返回 Observable 或 Single 的方法的示例,然后在其上调用blockingGet 以在不同的 . 我认为这可能是对图书馆和概念的滥用,但我可能是错的。我举一个小例子:

   public Observable<String> getStrings(){
     // return sg
   }

   public Observable<String> getNames(){
     // return names
   }

   public Observable<String> filterNamesInStrings() {
     List<String> names = getNames().toList().blockingGet();

     return getStrings().filter(str -> /*is str in names list*/)
   }   
Run Code Online (Sandbox Code Playgroud)

filterNamesInStrings可以通过以下方式解决:

   getNames()
    .toList()
    .flatMapObservable(names-> 
       getStrings().filter(str -> /*is str in names list*/)
Run Code Online (Sandbox Code Playgroud)

我的直觉是第二个解决方案更好,但我唯一的原因是我觉得使用blockingGet我们打破了可观察量的链条,失去了懒惰(我不确定Rx有多懒)但我做到了找不到任何东西来证明我的观点,也没有什么可以进一步解释第二个更好。另外,如果我是对的,除了快速测试之外,我没有看到任何其他阻止 get 的用例,这是真的吗?

我的问题:

  • 我的问题是否有效,或者实现之间的差异可以忽略不计?
  • 是否有任何解决方案比其他解决方案更好/更符合库的要求,如果是的话,为什么以及是否有充分的理由使用blockingGet?
  • (可选:你能给我推荐一本关于理解 ReactiveX 深度的好书吗?这样我就能得到对此类问题的解释,并且“良好实践”列表/书也会很方便)

rx-java reactivex

7
推荐指数
1
解决办法
1万
查看次数

RxJs:条件为真时缓冲事件,条件为假时传递事件

我创建了下面的 Observable 构造函数,它的工作原理与描述的一样。有谁知道使用 RxJs 附带的运算符是否有更简洁的方法来实现相同的行为?我正在查看接近所需行为的bufferToggle,但我需要在缓冲区关闭时传递发出的值。

函数说明:缓冲器所发射的source值,如果condition发射true,并穿过出射source如果值condition发射false。如果条件在 befalse之后发出true,则缓冲区按照接收到的顺序释放每个值。缓冲区被初始化为传递发出的source值,直到condition发出true.

function bufferIf<T>(condition: Observable<boolean>, source: Observable<T>): Observable<T> {
  return new Observable<T>(subscriber => {
    const subscriptions: Subscription[] = [];
    const buffer = [];
    let isBufferOpen = false;

    subscriptions.push(
      // handle source events
      source.subscribe(value => {
        // if buffer is open, or closed but buffer is still being 
        // emptied from …
Run Code Online (Sandbox Code Playgroud)

rxjs reactivex angular

6
推荐指数
1
解决办法
2182
查看次数

在 rxJava 中压缩超过 9 个 Observables

遇到需要做10-12个小的并行查询并合并结果的情况。但是如果有一个zip方法可以让你最多组合9个Observables,那怎么做更多我不明白。我尝试使用 zip 方法

public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)

但出现错误

java.lang.ClassCastException: java.util.ArrayList cannot be cast to io.reactivex.ObservableSource

尝试看起来像这样

List<Observable> list = new ArrayList<>();
list.add(orderRepository.getOne(54, "id"));
list.add(orderRepository.getTwo(54, "id"));
list.add(orderRepository.getThree(54, "id"));
list.add(orderRepository.getFour(54, "id"));
list.add(orderRepository.getFive());
list.add(orderRepository.getSix(54, "id"));
list.add(orderRepository.getSeven(54, "id"));
list.add(orderRepository.getEight());
list.add(orderRepository.getNine());
list.add(orderRepository.getTen(54, "id"));
list.add(orderRepository.getEleven(54, "id"));
Observable.fromIterable(list);

return Observable.zip(list,
        new Function<Object[], OrderModel>() {
            @Override
            public OrderModel apply(Object[] objects) throws Exception {
                Logger.trace("objects = ", objects);
                return new OrderModel();
            }
        });
Run Code Online (Sandbox Code Playgroud)

请举例说明如何在 Java …

android observable kotlin rx-java reactivex

6
推荐指数
2
解决办法
3823
查看次数