小编Jas*_*kes的帖子

Kafka是否支持请求响应消息传递

我正在调查Kafka 9作为业余爱好项目,并完成了一些"Hello World"类型示例.

我必须考虑基于请求响应消息传递的Real World Kafka应用程序,更具体地说,如何将Kafka请求消息链接到其响应消息.

我正在考虑使用生成的UUID作为请求消息密钥,并将此请求UUID用作关联的响应消息密钥.与WebSphere MQ具有消息关联ID的机制大致相同.

我的结束2结束过程将是.

1).Kafka客户端生成随机UUID并发送单个Kafka请求消息.2).服务器将使用此请求消息提取并存储请求UUID值3).使用消息有效内容完成业务流程.4).响应响应消息,该消息使用来自请求消息的存储的UUID值作为响应消息Key.5).Kafka客户端轮询响应主题,直到它超时或检索具有原始请求UUID值的消息.

我关注的是Kafka Consumer轮询将从响应主题中删除其他客户端消息,并增加偏移量,使其他客户端失败.

我是否尝试在一个用例中应用Kafka它从未设计过?

是否可以在Kafka中实现请求/响应消息传递?

mom apache-kafka kafka-consumer-api kafka-producer-api

19
推荐指数
1
解决办法
1万
查看次数

是否可以使用 Project Reactor 等待事件而不阻塞线程?

Project Reactor 是否可以在单声道中等待事件/条件,而无需为每个单声道使用阻塞线程?有了 a,CompletableFuture我可以完成这样的事情,但我不知道如何使用 Project Reactor 来做到这一点。

我的问题是我需要将请求与响应关联起来。响应时间差异很大,有些甚至永远不会得到回复和超时。在客户端,每个请求的阻塞线程不是问题,但由于这是一个服务器应用程序,我不想最终为每个请求生成一个阻塞等待响应的线程。

API 看起来像这样:

Mono<Response> doRequest(Mono<Request> request);
Run Code Online (Sandbox Code Playgroud)

由于我不知道如何使用 Reactor 来做到这一点,我将解释如何使用 a 来做到这一点,CompletableFuture以澄清我正在寻找的内容。API 看起来像这样:

CompletableFuture<Response> doRequest(Request request);
Run Code Online (Sandbox Code Playgroud)

当调用者调用时,会向服务器发出请求,其中包含此方法生成的相关 ID。返回调用者 aCompletableFuture并且该方法将对此的引用存储CompletableFuture在映射中,并以相关 ID 作为键。

还有一个线程(池),用于接收服务器的所有响应。CompletableFuture当它收到响应时,它会从响应中获取相关 ID,并使用它在映射中查找原始请求(即 )并complete(response);调用它。

在此实现中,您不需要每个请求都有一个阻塞线程。这基本上更像是一种 Vert.X/Netty 的思维方式?我想知道如何使用 Project Reactor 来实现这样的事情(如果可能的话)。

编辑 2019 年 7 月 25 日:

根据评论中的要求澄清我所得到的内容,下面是我如何使用CompleteableFuture's 实现这一点的示例。

我还注意到我犯了一个可能相当令人困惑的错误:在CompletableFuture示例中我传递了一个Mono作为参数。这应该只是一个“正常”的争论。我很抱歉,我希望我没有让人们对此感到太困惑。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

class NonBlockingCorrelatingExample {

    /**
     * This example shows how to implement correlating …
Run Code Online (Sandbox Code Playgroud)

java multithreading project-reactor

5
推荐指数
1
解决办法
2310
查看次数