ath*_*hor 59 java delay rx-java
我正在努力实现我认为在Rx中相当简单的东西.
我有一个项目列表,我想让每个项目都延迟发出.
似乎Rx delay()操作符只是按指定的延迟而不是每个单独的项目来移动所有项目的发射.
这是一些测试代码.它将列表中的项目分组.然后每个组应该在发射之前应用延迟.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.delay(50, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Run Code Online (Sandbox Code Playgroud)
结果是:
154ms
[5]
155ms
[2]
155ms
[1]
155ms
[3]
155ms
[4]
Run Code Online (Sandbox Code Playgroud)
但我期望看到的是这样的:
174ms
[5]
230ms
[2]
285ms
[1]
345ms
[3]
399ms
[4]
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?
iag*_*een 59
一种方法是使用zip将observable和Intervalobservable 组合起来来延迟输出.
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Run Code Online (Sandbox Code Playgroud)
Mag*_*nus 49
最简单的方法似乎只是使用concatMap并将每个项目包装在延迟的Obserable中.
long startTime = System.currentTimeMillis();
Observable.range(1, 5)
.concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
.doOnNext(i-> System.out.println(
"Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
.toCompletable().await();
Run Code Online (Sandbox Code Playgroud)
打印:
Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms
Run Code Online (Sandbox Code Playgroud)
Min*_*amy 36
只需共享一个简单的方法,以一个间隔发出集合中的每个项目:
Observable.just(1,2,3,4,5)
.zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
每个项目将每500毫秒发出一次
Tim*_*Tim 11
对于kotlin用户,我为'zip with interval'方法编写了扩展功能
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
Observable.zip(
this,
Observable.interval(interval, timeUnit),
BiFunction { item, _ -> item }
)
Run Code Online (Sandbox Code Playgroud)
它的工作方式相同,但是可重用。例:
Observable.range(1, 5)
.delayEach(1, TimeUnit.SECONDS)
Run Code Online (Sandbox Code Playgroud)
我认为这正是你所需要的。看看:
long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
.timestamp(TimeUnit.MILLISECONDS)
.subscribe(emitTime -> {
System.out.println(emitTime.time() - startTime);
});
Run Code Online (Sandbox Code Playgroud)
在发出的每个项目之间引入延迟很有用:
List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
Observable.fromIterable(letters)
.concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
.take(1)
.map(second -> item))
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
更多好的选择请访问https://github.com/ReactiveX/RxJava/issues/3505
| 归档时间: |
|
| 查看次数: |
36076 次 |
| 最近记录: |