如何停止和恢复Observable.interval发出滴答声

Jan*_*ers 33 java rx-java rx-android

这将每5秒发出一次滴答声.

Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
            .subscribe(tick -> Log.d(TAG, "tick = "+tick));
Run Code Online (Sandbox Code Playgroud)

要停止它你可以使用

Schedulers.shutdown();
Run Code Online (Sandbox Code Playgroud)

但随后所有调度程序停止,以后无法恢复计时.我怎样才能停止并恢复"优雅地"发出的声音?

And*_*dEx 36

这是一个可能的解决方案:

class TickHandler {

    private AtomicLong lastTick = new AtomicLong(0L);
    private Subscription subscription;

    void resume() {
        System.out.println("resumed");
        subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                                 .map(tick -> lastTick.getAndIncrement())
                                 .subscribe(tick -> System.out.println("tick = " + tick));
    }

    void stop() {
        if (subscription != null && !subscription.isUnsubscribed()) {
            System.out.println("stopped");
            subscription.unsubscribe();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • @DanChaltiel一般来说,当多个线程写入依赖于字段中先前值的字段值时,`volatile`提供的保证不够强,并且需要通过可用的Java并发来同步或排列访问权限蜜蜂.更多关于`volatile` http://tutorials.jenkov.com/java-concurrency/volatile.html#when-is-volatile-enough. (2认同)

Art*_*ski 16

前段时间,我也在寻找一种RX"计时器"解决方案,但没有达到我的期望.所以你可以找到我自己的解决方案:

AtomicLong elapsedTime = new AtomicLong();
AtomicBoolean resumed = new AtomicBoolean();
AtomicBoolean stopped = new AtomicBoolean();

public Flowable<Long> startTimer() { //Create and starts timper
    resumed.set(true);
    stopped.set(false);
    return Flowable.interval(1, TimeUnit.SECONDS)
            .takeWhile(tick -> !stopped.get())
            .filter(tick -> resumed.get())
            .map(tick -> elapsedTime.addAndGet(1000));
}

public void pauseTimer() {
    resumed.set(false);
}

public void resumeTimer() {
    resumed.set(true);
}

public void stopTimer() {
    stopped.set(true);
}

public void addToTimer(int seconds) {
    elapsedTime.addAndGet(seconds * 1000);
}
Run Code Online (Sandbox Code Playgroud)

  • 我不敢相信没有人提出这个答案。接受的答案在每次暂停后创建一个新的 Observable 并且不容纳 lambdas 中的最终字段(您必须使用类字段)。这是一个更好的解决方案。 (3认同)
  • 这是唯一可以在生产代码中使用的答案。 (2认同)

She*_*eng 9

val switch = new java.util.concurrent.atomic.AtomicBoolean(true)
val tick = new java.util.concurrent.atomic.AtomicLong(0L)

val suspendableObservable = 
  Observable.
    interval(5 seconds).
    takeWhile(_ => switch.get()).
    repeat.
    map(_ => tick.incrementAndGet())
Run Code Online (Sandbox Code Playgroud)

您可以设置switchfalse暂停滴答并true恢复它.