发出的每个列表项的RxJava延迟

ath*_*hor 59 java delay rx-java

我正在努力实现我认为在Rx中相当简单的东西.

我有一个项目列表,我想让每个项目都延迟发出.

似乎Rx delay()操作符只是按指定的延迟而不是每个单独的项目来移动所有项目的发射.

这是一些测试代码.它将列表中的项目分组.然后每个组应该在发射之前应用延迟.

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();
Run Code Online (Sandbox Code Playgroud)

结果是:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]
Run Code Online (Sandbox Code Playgroud)

但我期望看到的是这样的:

174ms
[5]

230ms
[2]

285ms
[1]

345ms
[3]

399ms
[4]
Run Code Online (Sandbox Code Playgroud)

我究竟做错了什么?

iag*_*een 59

一种方法是使用zip将observable和Intervalobservable 组合起来来延迟输出.

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();
Run Code Online (Sandbox Code Playgroud)

  • 如果您以低于50毫秒的速率获取observable发出项目,则此方法无效.`Observable.interval()`将每50毫秒发出一个项目.如果您的分组列表中没有匹配的项目,`zip()`运算符将缓冲这些.然后,当一个组被发出时,zip会立即将它与来自可观察区间的项目组合,并立即将它发送到你的`doOnNext()`. (7认同)
  • 谢谢!我认为延迟运算符不适合我想要的方式使用。此解决方案有效:) (2认同)

Mag*_*nus 49

最简单的方法似乎只是使用concatMap并将每个项目包装在延迟的Obserable中.

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
        .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
        .doOnNext(i-> System.out.println(
                "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
        .toCompletable().await();
Run Code Online (Sandbox Code Playgroud)

打印:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms
Run Code Online (Sandbox Code Playgroud)


Min*_*amy 36

只需共享一个简单的方法,以一个间隔发出集合中的每个项目:

Observable.just(1,2,3,4,5)
    .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

每个项目将每500毫秒发出一次


Tim*_*Tim 11

对于kotlin用户,我为'zip with interval'方法编写了扩展功能

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit

fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
    Observable.zip(
        this, 
        Observable.interval(interval, timeUnit), 
        BiFunction { item, _ -> item }
    )
Run Code Online (Sandbox Code Playgroud)

它的工作方式相同,但是可重用。例:

Observable.range(1, 5)
    .delayEach(1, TimeUnit.SECONDS)
Run Code Online (Sandbox Code Playgroud)


Vla*_*lad 5

我认为这正是你所需要的。看看:

long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
                .timestamp(TimeUnit.MILLISECONDS)
                .subscribe(emitTime -> {
                    System.out.println(emitTime.time() - startTime);
                });
Run Code Online (Sandbox Code Playgroud)


yai*_*eno 5

在发出的每个项目之间引入延迟很有用:

List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));

Observable.fromIterable(letters)
                .concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
                        .take(1)
                        .map(second -> item))
                .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

更多好的选择请访问https://github.com/ReactiveX/RxJava/issues/3505