是否可以使用 Project Reactor 等待事件而不阻塞线程?

Jas*_*kes 5 java multithreading project-reactor

Project Reactor 是否可以在单声道中等待事件/条件,而无需为每个单声道使用阻塞线程?有了 a,CompletableFuture我可以完成这样的事情,但我不知道如何使用 Project Reactor 来做到这一点。

我的问题是我需要将请求与响应关联起来。响应时间差异很大,有些甚至永远不会得到回复和超时。在客户端,每个请求的阻塞线程不是问题,但由于这是一个服务器应用程序,我不想最终为每个请求生成一个阻塞等待响应的线程。

API 看起来像这样:

Mono<Response> doRequest(Mono<Request> request);
Run Code Online (Sandbox Code Playgroud)

由于我不知道如何使用 Reactor 来做到这一点,我将解释如何使用 a 来做到这一点,CompletableFuture以澄清我正在寻找的内容。API 看起来像这样:

CompletableFuture<Response> doRequest(Request request);
Run Code Online (Sandbox Code Playgroud)

当调用者调用时,会向服务器发出请求,其中包含此方法生成的相关 ID。返回调用者 aCompletableFuture并且该方法将对此的引用存储CompletableFuture在映射中,并以相关 ID 作为键。

还有一个线程(池),用于接收服务器的所有响应。CompletableFuture当它收到响应时,它会从响应中获取相关 ID,并使用它在映射中查找原始请求(即 )并complete(response);调用它。

在此实现中,您不需要每个请求都有一个阻塞线程。这基本上更像是一种 Vert.X/Netty 的思维方式?我想知道如何使用 Project Reactor 来实现这样的事情(如果可能的话)。

编辑 2019 年 7 月 25 日:

根据评论中的要求澄清我所得到的内容,下面是我如何使用CompleteableFuture's 实现这一点的示例。

我还注意到我犯了一个可能相当令人困惑的错误:在CompletableFuture示例中我传递了一个Mono作为参数。这应该只是一个“正常”的争论。我很抱歉,我希望我没有让人们对此感到太困惑。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

class NonBlockingCorrelatingExample {

    /**
     * This example shows how to implement correlating requests with responses without needing a (sleeping)
     * thread per request to wait for the response with the use of {@link CompletableFuture}'s.
     *
     * So the main feat of this example is that there is always a fixed (small) number of threads used even if one
     * would fire a thousands requests.
     */
    public static void main(String[] args) throws Exception {

        RequestResponseService requestResponseService = new RequestResponseService();

        Request request = new Request();
        request.correlationId = 1;
        request.question = "Do you speak Spanish?";

        CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
        responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));

        // The blocking call here is just so the application doesn't exit until the demo is completed.
        responseFuture.get();
    }

    static class RequestResponseService {

        /** The key in this map is the correlation ID. */
        private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses =  new ConcurrentHashMap<>();

        CompletableFuture<Response> doRequest(Request request) {
            Response response = new Response();
            response.correlationId = request.correlationId;
            CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
            responses.put(response.correlationId, reponseFuture);

            doNonBlockingFireAndForgetRequest(request);

            return reponseFuture;
        }

        private void doNonBlockingFireAndForgetRequest(Request request) {
            // In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
            // Right now we will just make a call which will simulate a response message coming in after a while.
            simulateResponses();
        }

        private void processResponse(Response response) {
            // There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
            // in a response topic and calls this method to handle those messages.
            CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
            responseFuture.complete(response);
        }

        void simulateResponses() {
            // This is just to make the example work. Not part of the example.
            new Thread(() -> {
                try {
                    // Simulate a delay.
                    Thread.sleep(10_000);

                    Response response = new Response();
                    response.correlationId = 1;
                    response.answer = "Si!";

                    processResponse(response);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    static class Request {
        long correlationId;
        String question;
    }

    static class Response {
        long correlationId;
        String answer;
    }

}
Run Code Online (Sandbox Code Playgroud)

Ale*_*kin 4

对的,这是可能的。您可以使用reactor.core.publisher.Mono#create方法来实现它

对于你的例子:

public static void main(String[] args) throws Exception {
    RequestResponseService requestResponseService = new RequestResponseService();

    Request request = new Request();
    request.correlationId = 1;
    request.question = "Do you speak Spanish?";


    Mono<Request> requestMono = Mono.just(request)
            .doOnNext(rq -> System.out.println(rq.question));
    requestResponseService.doRequest(requestMono)
            .doOnNext(response -> System.out.println(response.answer))
            // The blocking call here is just so the application doesn't exit until the demo is completed.
            .block();
}

static class RequestResponseService {
    private final ConcurrentHashMap<Long, Consumer<Response>> responses =
            new ConcurrentHashMap<>();

    Mono<Response> doRequest(Mono<Request> request) {
        return request.flatMap(rq -> doNonBlockingFireAndForgetRequest(rq)
                .then(Mono.create(sink -> responses.put(rq.correlationId, sink::success))));
    }

    private Mono<Void> doNonBlockingFireAndForgetRequest(Request request) {
        return Mono.fromRunnable(this::simulateResponses);
    }

    private void processResponse(Response response) {
        responses.get(response.correlationId).accept(response);
    }

    void simulateResponses() {
        // This is just to make the example work. Not part of the example.
        new Thread(() -> {
            try {
                // Simulate a delay.
                Thread.sleep(10_000);

                Response response = new Response();
                response.correlationId = 1;
                response.answer = "Si!";

                processResponse(response);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
Run Code Online (Sandbox Code Playgroud)