我正在调查Kafka 9作为业余爱好项目,并完成了一些"Hello World"类型示例.
我必须考虑基于请求响应消息传递的Real World Kafka应用程序,更具体地说,如何将Kafka请求消息链接到其响应消息.
我正在考虑使用生成的UUID作为请求消息密钥,并将此请求UUID用作关联的响应消息密钥.与WebSphere MQ具有消息关联ID的机制大致相同.
我的结束2结束过程将是.
1).Kafka客户端生成随机UUID并发送单个Kafka请求消息.2).服务器将使用此请求消息提取并存储请求UUID值3).使用消息有效内容完成业务流程.4).响应响应消息,该消息使用来自请求消息的存储的UUID值作为响应消息Key.5).Kafka客户端轮询响应主题,直到它超时或检索具有原始请求UUID值的消息.
我关注的是Kafka Consumer轮询将从响应主题中删除其他客户端消息,并增加偏移量,使其他客户端失败.
我是否尝试在一个用例中应用Kafka它从未设计过?
是否可以在Kafka中实现请求/响应消息传递?
Project Reactor 是否可以在单声道中等待事件/条件,而无需为每个单声道使用阻塞线程?有了 a,CompletableFuture我可以完成这样的事情,但我不知道如何使用 Project Reactor 来做到这一点。
我的问题是我需要将请求与响应关联起来。响应时间差异很大,有些甚至永远不会得到回复和超时。在客户端,每个请求的阻塞线程不是问题,但由于这是一个服务器应用程序,我不想最终为每个请求生成一个阻塞等待响应的线程。
API 看起来像这样:
Mono<Response> doRequest(Mono<Request> request);
Run Code Online (Sandbox Code Playgroud)
由于我不知道如何使用 Reactor 来做到这一点,我将解释如何使用 a 来做到这一点,CompletableFuture以澄清我正在寻找的内容。API 看起来像这样:
CompletableFuture<Response> doRequest(Request request);
Run Code Online (Sandbox Code Playgroud)
当调用者调用时,会向服务器发出请求,其中包含此方法生成的相关 ID。返回调用者 aCompletableFuture并且该方法将对此的引用存储CompletableFuture在映射中,并以相关 ID 作为键。
还有一个线程(池),用于接收服务器的所有响应。CompletableFuture当它收到响应时,它会从响应中获取相关 ID,并使用它在映射中查找原始请求(即 )并complete(response);调用它。
在此实现中,您不需要每个请求都有一个阻塞线程。这基本上更像是一种 Vert.X/Netty 的思维方式?我想知道如何使用 Project Reactor 来实现这样的事情(如果可能的话)。
编辑 2019 年 7 月 25 日:
根据评论中的要求澄清我所得到的内容,下面是我如何使用CompleteableFuture's 实现这一点的示例。
我还注意到我犯了一个可能相当令人困惑的错误:在CompletableFuture示例中我传递了一个Mono作为参数。这应该只是一个“正常”的争论。我很抱歉,我希望我没有让人们对此感到太困惑。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
class NonBlockingCorrelatingExample {
/**
* This example shows how to implement correlating …Run Code Online (Sandbox Code Playgroud)