Jas*_*kes 5 java multithreading project-reactor
Project Reactor 是否可以在单声道中等待事件/条件,而无需为每个单声道使用阻塞线程?有了 a,CompletableFuture我可以完成这样的事情,但我不知道如何使用 Project Reactor 来做到这一点。
我的问题是我需要将请求与响应关联起来。响应时间差异很大,有些甚至永远不会得到回复和超时。在客户端,每个请求的阻塞线程不是问题,但由于这是一个服务器应用程序,我不想最终为每个请求生成一个阻塞等待响应的线程。
API 看起来像这样:
Mono<Response> doRequest(Mono<Request> request);
Run Code Online (Sandbox Code Playgroud)
由于我不知道如何使用 Reactor 来做到这一点,我将解释如何使用 a 来做到这一点,CompletableFuture以澄清我正在寻找的内容。API 看起来像这样:
CompletableFuture<Response> doRequest(Request request);
Run Code Online (Sandbox Code Playgroud)
当调用者调用时,会向服务器发出请求,其中包含此方法生成的相关 ID。返回调用者 aCompletableFuture并且该方法将对此的引用存储CompletableFuture在映射中,并以相关 ID 作为键。
还有一个线程(池),用于接收服务器的所有响应。CompletableFuture当它收到响应时,它会从响应中获取相关 ID,并使用它在映射中查找原始请求(即 )并complete(response);调用它。
在此实现中,您不需要每个请求都有一个阻塞线程。这基本上更像是一种 Vert.X/Netty 的思维方式?我想知道如何使用 Project Reactor 来实现这样的事情(如果可能的话)。
编辑 2019 年 7 月 25 日:
根据评论中的要求澄清我所得到的内容,下面是我如何使用CompleteableFuture's 实现这一点的示例。
我还注意到我犯了一个可能相当令人困惑的错误:在CompletableFuture示例中我传递了一个Mono作为参数。这应该只是一个“正常”的争论。我很抱歉,我希望我没有让人们对此感到太困惑。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
class NonBlockingCorrelatingExample {
/**
* This example shows how to implement correlating requests with responses without needing a (sleeping)
* thread per request to wait for the response with the use of {@link CompletableFuture}'s.
*
* So the main feat of this example is that there is always a fixed (small) number of threads used even if one
* would fire a thousands requests.
*/
public static void main(String[] args) throws Exception {
RequestResponseService requestResponseService = new RequestResponseService();
Request request = new Request();
request.correlationId = 1;
request.question = "Do you speak Spanish?";
CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));
// The blocking call here is just so the application doesn't exit until the demo is completed.
responseFuture.get();
}
static class RequestResponseService {
/** The key in this map is the correlation ID. */
private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses = new ConcurrentHashMap<>();
CompletableFuture<Response> doRequest(Request request) {
Response response = new Response();
response.correlationId = request.correlationId;
CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
responses.put(response.correlationId, reponseFuture);
doNonBlockingFireAndForgetRequest(request);
return reponseFuture;
}
private void doNonBlockingFireAndForgetRequest(Request request) {
// In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
// Right now we will just make a call which will simulate a response message coming in after a while.
simulateResponses();
}
private void processResponse(Response response) {
// There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
// in a response topic and calls this method to handle those messages.
CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
responseFuture.complete(response);
}
void simulateResponses() {
// This is just to make the example work. Not part of the example.
new Thread(() -> {
try {
// Simulate a delay.
Thread.sleep(10_000);
Response response = new Response();
response.correlationId = 1;
response.answer = "Si!";
processResponse(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
static class Request {
long correlationId;
String question;
}
static class Response {
long correlationId;
String answer;
}
}
Run Code Online (Sandbox Code Playgroud)
对的,这是可能的。您可以使用reactor.core.publisher.Mono#create方法来实现它
对于你的例子:
public static void main(String[] args) throws Exception {
RequestResponseService requestResponseService = new RequestResponseService();
Request request = new Request();
request.correlationId = 1;
request.question = "Do you speak Spanish?";
Mono<Request> requestMono = Mono.just(request)
.doOnNext(rq -> System.out.println(rq.question));
requestResponseService.doRequest(requestMono)
.doOnNext(response -> System.out.println(response.answer))
// The blocking call here is just so the application doesn't exit until the demo is completed.
.block();
}
static class RequestResponseService {
private final ConcurrentHashMap<Long, Consumer<Response>> responses =
new ConcurrentHashMap<>();
Mono<Response> doRequest(Mono<Request> request) {
return request.flatMap(rq -> doNonBlockingFireAndForgetRequest(rq)
.then(Mono.create(sink -> responses.put(rq.correlationId, sink::success))));
}
private Mono<Void> doNonBlockingFireAndForgetRequest(Request request) {
return Mono.fromRunnable(this::simulateResponses);
}
private void processResponse(Response response) {
responses.get(response.correlationId).accept(response);
}
void simulateResponses() {
// This is just to make the example work. Not part of the example.
new Thread(() -> {
try {
// Simulate a delay.
Thread.sleep(10_000);
Response response = new Response();
response.correlationId = 1;
response.answer = "Si!";
processResponse(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
Run Code Online (Sandbox Code Playgroud)