And*_*ski 4 java project-reactor
我有三个与Project Reactor有关的问题,下面将问他们。从我拥有的代码开始(它将简化以更容易理解该问题)。
Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
return Mono.just("hello")
.compose(monostr -> monostr
.doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1)
.doOnCancel(() -> System.out.println("cancelled")) //(2)
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
);
}
Run Code Online (Sandbox Code Playgroud)
并测试:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("do some long timed work");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("work has completed");
return str.length();
});
StepVerifier.create(doWithSession(fun1,1000))
.verifyError(TimeoutException.class);
}
Run Code Online (Sandbox Code Playgroud)
所以和问题:
fun1并立即返回错误?(也许我做错了,但它看起来不是在超时后而是在所有回调调用之后返回错误)doOnSuccess和同时doOnCancel调用?(我希望将调用(1)或(2),但不会两者都调用)Mono.just("hello")正在获取连接;callback我做与连接东西,得到一些结果(Mono<Integer>在我的情况);1)如您所知,请使用.publishOn(Schedulers.single())。这将确保可调用对象在另一个线程上被调用,并且仅阻塞该线程。另外,它将允许可调用对象被取消。
2)您的连锁顺序很重要。您将其.doOnSuccess放在的开头compose(顺便说一句,您实际上不需要该特定示例,除非您要提取该组合函数以供以后重用)。因此,这意味着它Mono.just基本上从基础上获取通知,并在查询源后立即运行,甚至在您进行处理之前...与相同doOnCancel。取消来自timeout触发...
3)有一家工厂可以根据资源创建序列,并确保清理资源:Mono.using。所以看起来像这样:
public <T> Mono<T> doWithConnection(Function<String, Mono<T>> callback, long timeout) {
return Mono.using(
//the resource supplier:
() -> {
System.out.println("connection acquired");
return "hello";
},
//create a Mono out of the resource. On any termination, the resource is cleaned up
connection -> Mono.just(connection)
//the blocking callable needs own thread:
.publishOn(Schedulers.single())
//execute the callable and get result...
.then(callback::apply)
//...but cancel if it takes too long
.timeoutMillis(timeout)
//for demonstration we'll log when timeout triggers:
.doOnError(TimeoutException.class, e -> System.out.println("timed out")),
//the resource cleanup:
connection -> System.out.println("cleaned up " + connection));
}
Run Code Online (Sandbox Code Playgroud)
返回Mono<T>可调用对象的T值的a。在生产代码中,您需要订阅它来处理价值。在测试中,StepVerifier.create()将订阅您。
让我们通过长期运行的任务来证明这一点,并查看其输出:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("start some long timed work");
//for demonstration we'll print some clock ticks
for (int i = 1; i <= 5; i++) {
try {
Thread.sleep(1000);
System.out.println(i + "s...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("work has completed");
return str.length();
});
//let two ticks show up
StepVerifier.create(doWithConnection(fun1,2100))
.verifyError(TimeoutException.class);
}
Run Code Online (Sandbox Code Playgroud)
输出:
connection acquired
start some long timed work
1s...
2s...
timed out
cleaned up hello
Run Code Online (Sandbox Code Playgroud)
如果超时超过5000,我们将得到以下结果。(存在断言错误,因为StepVerifier会超时):
connection acquired
start some long timed work
1s...
2s...
3s...
4s...
5s...
work has completed
cleaned up hello
java.lang.AssertionError: expectation "expectError(Class)" failed (expected: onError(TimeoutException); actual: onNext(5)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4503 次 |
| 最近记录: |