这需要背压还是有更简单的方法?
例如在下面的代码中,我希望每 2 秒调用一次自旋函数。有时“旋转”可能需要比 2 秒间隔更长的时间来计算,在这种情况下,我不希望任何间隔排放排队。但在下面的代码中,他们确实排队。
在下面的代码中,前 4 个自旋函数调用需要 10 秒,其余需要 1 秒。因此,一旦函数变得更快, Flux.interval 排放就会“赶上”。但是,我不希望发生任何“追赶”
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
public class Test {
public static void main(String[] args) {
Iterator<Integer> secs = new Iterator<Integer>() {
private int num = 0;
@Override
public boolean hasNext() {
return true;
}
@Override
public Integer next() {
return num++ < 4 ? 10 : 1;
}
};
Flux.interval(Duration.ofSeconds(5))
.map(n -> {spin(secs.next()); return n;})
.doOnNext(n -> log("Processed " + n))
.blockLast();
}
private static void spin(int secs) {
log("Current job will take " + secs + " secs");
long sleepTime = secs*1000000000L; // convert to nanos
long startTime = System.nanoTime();
while ((System.nanoTime() - startTime) < sleepTime) {}
}
static void log(Object label) {
System.out.println((new Date()).toString() + "\t| " +Thread.currentThread().getName() + "\t| " + label);
}
}
Run Code Online (Sandbox Code Playgroud)
输出:注意“已处理”时间戳最初间隔 10 秒,但从作业 4 到作业 8,有一个我不想发生的“赶上”。我想在上次调用后不早于 2 秒执行
Thu Jun 01 17:16:23 EDT 2017 | parallel-1 | Current job will take 10 secs
Thu Jun 01 17:16:33 EDT 2017 | parallel-1 | Processed 0
Thu Jun 01 17:16:33 EDT 2017 | parallel-1 | Current job will take 10 secs
Thu Jun 01 17:16:43 EDT 2017 | parallel-1 | Processed 1
Thu Jun 01 17:16:43 EDT 2017 | parallel-1 | Current job will take 10 secs
Thu Jun 01 17:16:53 EDT 2017 | parallel-1 | Processed 2
Thu Jun 01 17:16:53 EDT 2017 | parallel-1 | Current job will take 10 secs
Thu Jun 01 17:17:03 EDT 2017 | parallel-1 | Processed 3
Thu Jun 01 17:17:03 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:04 EDT 2017 | parallel-1 | Processed 4
Thu Jun 01 17:17:04 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:05 EDT 2017 | parallel-1 | Processed 5
Thu Jun 01 17:17:05 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:06 EDT 2017 | parallel-1 | Processed 6
Thu Jun 01 17:17:06 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:07 EDT 2017 | parallel-1 | Processed 7
Thu Jun 01 17:17:07 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:08 EDT 2017 | parallel-1 | Processed 8
Thu Jun 01 17:17:08 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:09 EDT 2017 | parallel-1 | Processed 9
Thu Jun 01 17:17:13 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:14 EDT 2017 | parallel-1 | Processed 10
Thu Jun 01 17:17:18 EDT 2017 | parallel-1 | Current job will take 1 secs
Thu Jun 01 17:17:19 EDT 2017 | parallel-1 | Processed 11
Run Code Online (Sandbox Code Playgroud)
@Test
public void testInterval() throws Exception {
Flux.interval(Duration.ofSeconds(1))
.subscribe(new Subscriber<Long>() {
private Subscription subscription;
@Override
public void onSubscribe(final Subscription s) {
this.subscription = s;
s.request(1);
}
@Override
public void onNext(final Long aLong) {
// execute the operation that cold possibly take longer than the interval (1s):
try {
Thread.sleep(5000);
System.out.println(aLong);
} catch (InterruptedException e) {
}
// request another item when we're done:
subscription.request(1);
}
@Override
public void onError(final Throwable t) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(12000);
}
Run Code Online (Sandbox Code Playgroud)
这个想法是在长时间运行的任务完成后立即明确请求“下一个间隔”。
| 归档时间: |
|
| 查看次数: |
3434 次 |
| 最近记录: |