立即发送第一个项目,"debounce"以下项目

tom*_*ozb 35 rx-java rx-android

考虑以下用例:

  • 需要尽快交付第一件物品
  • 需要在1秒超时后对事件进行去抖动

我最终实现了自定义运算符,OperatorDebounceWithTime然后像这样使用它

.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))
Run Code Online (Sandbox Code Playgroud)

CustomOperatorDebounceWithTime立即发送第一个项目,然后使用OperatorDebounceWithTime操作员的逻辑去除后期项目.

是否有更简单的方法来实现所描述的行为?让我们跳过compose运算符,但它没有解决问题.我正在寻找一种方法来实现这一点,而无需实现自定义运算符.

Lor*_*nMK 34

更新:
来自@ lopar的评论更好的方法是:

Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))
Run Code Online (Sandbox Code Playgroud)

会这样的工作:

String[] items = {"one", "two", "three", "four", "five", "six", "seven", "eight"};
Observable<String> myObservable = Observable.from(items);
Observable.concat(myObservable.first(), myObservable.skip(1).debounce(1, TimeUnit.SECONDS))
    .subscribe(s -> System.out.println(s));
Run Code Online (Sandbox Code Playgroud)

  • 您还可以使用`publish`来防止进行双重订阅(在冷可观察的情况下,两次进行工作,并且在热情况下可能会不同步):`Observable.from(items).publish(publishedItems - > publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1,TimeUnit.SECONDS)))`另请注意,如果`Observable`为空,`first`将会爆炸,所以除非这是期望的行为,你可能想要`limit(1)`. (6认同)
  • 在版本2中使用.take(1)而不是.limit(1) (5认同)
  • 我在等待@lopar提出的解决方案.我认为第二个可能在某些情况下不起作用.`myObservable.skip(1)`可以在第一个项目发布后订阅,因此它将跳过第二个元素. (2认同)
  • @LordRaydenMK,你原来的答案很好,不用担心成为初学者!:)很高兴我可以帮助调整一下. (2认同)
  • 使用发布表单时,不应使用.skip(1)。因为可观察对象使用相同的订阅,所以它将从同一位置继续。它应该只是` (2认同)
  • 我之前用过这个答案。而我刚刚意识到是错误的。问题是去抖动隐藏了问题。不要写skip(1),否则每次都会丢失第二项。[完整答案在这里](/sf/answers/3464818801/) (2认同)

Law*_*oot 17

@LortRaydenMK和@lopar的答案是最好的,但我想提出一些其他的建议,以防它碰巧更适合你或类似情况的人.

有一个变体debounce()需要一个函数来决定去抖这个特定项目多长时间.它通过返回在一段时间后完成的observable来指定它.您的功能可以返回empty()第一个项目和timer()其余项目.像(未经测试)的东西:

String[] items = {"one", "two", "three", "four", "five", "six"};
Observable.from(items)
    .debounce(item -> item.equals("one")
            ? Observable.empty()
            : Observable.timer(1, TimeUnit.SECONDS));
Run Code Online (Sandbox Code Playgroud)

诀窍是这个函数必须知道哪个项目是第一个.你的序列可能知道.如果没有,你可能不得不zip()range()或什么的.在这种情况下更好的是在另一个答案中使用该解决方案.


Adr*_*ker 7

使用RxJava 2.0的简单解决方案,从RxJS 的相同问题的答案翻译而来,结合了throttleFirst和debounce,然后删除重复项.

private <T> ObservableTransformer<T, T> debounceImmediate() {
    return observable  -> observable.publish(p -> 
        Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS), 
            p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
} 

@Test
public void testDebounceImmediate() {
    Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
        .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
        .doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
            .compose(debounceImmediate())
            .blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}
Run Code Online (Sandbox Code Playgroud)

使用limit()或take()的方法似乎不能处理长期存在的数据流,我可能希望不断观察,但仍然会立即对一段时间内看到的第一个事件采取行动.


Ric*_*mer 6

使用该debounce函数的版本并以这种方式实现该函数:

    .debounce(new Func1<String, Observable<String>>() {
        private AtomicBoolean isFirstEmission = new AtomicBoolean(true);
        @Override
        public Observable<String> call(String s) {
             // note: standard debounce causes the first item to be
             // delayed by 1 second unnecessarily, this is a workaround
             if (isFirstEmission.getAndSet(false)) {
                 return Observable.just(s);
             } else {
                 return Observable.just(s).delay(1, TimeUnit.SECONDS);
             }
        }
    })
Run Code Online (Sandbox Code Playgroud)

第一个项目立即发出.后续项目延迟一秒.如果延迟观察值在下一个项目到达之前没有终止,则会被取消,因此会实现预期的去抖动行为.


gsw*_*ski 6

Kotlin 扩展函数基于@lopar 的评论:

fun <T> Flowable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Flowable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}

fun <T> Observable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Observable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}
Run Code Online (Sandbox Code Playgroud)


Bra*_*bin 5

LordRaydenMK和洛帕尔的回答有一个问题:你总是输掉第二项。我认为以前没有人会解决这个问题,因为如果您进行了防抖操作,通常会发生很多事件,而第二种方法无论如何都会与防抖操作有关。永不丢失事件的正确方法是:

observable
    .publish(published ->
        published
            .limit(1)
            .concatWith(published.debounce(1, TimeUnit.SECONDS)));
Run Code Online (Sandbox Code Playgroud)

不用担心,您不会获得任何重复的事件。如果不确定,可以运行以下代码并亲自检查:

Observable.just(1, 2, 3, 4)
    .publish(published ->
        published
            .limit(1)
            .concatWith(published))
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)