我正在对 Quarkus 中开发的一个简单应用程序进行负载测试。应用程序将 http 请求代理到另一个 http 服务。该应用程序使用org.eclipse.microprofile.reactive.messaging.Emitter和org.eclipse.microprofile.reactive.messaging.Channel。
如果我将请求率提高到 300 请求/秒,则会出现以下错误。我试图了解错误SRMSG00034:发出项目的下游请求不足以及如何解决它。任何帮助,将不胜感激。
2021-03-10 06:43:47,678 ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (executor-thread-100) HTTP Request to /events failed, error id: cb6577a7-0cd6-4790-a5ea-5ccd73a088fc-289: java.lang.IllegalStateException: SRMSG00034: Insufficient downstream requests to emit item
at io.smallrye.reactive.messaging.extension.ThrowingEmitter.emit(ThrowingEmitter.java:60)
at io.smallrye.reactive.messaging.extension.AbstractEmitter.emit(AbstractEmitter.java:146)
at io.smallrye.reactive.messaging.extension.EmitterImpl.send(EmitterImpl.java:29)
Run Code Online (Sandbox Code Playgroud) 我想使用这个扩展:[Quarkus Smallrye Reactive Messaging Kafka]
但在我的应用程序中,主题的名称是事先未知的,它是根据运行时从用户收到的消息指定的。如何在没有注释的情况下以编程方式指定主题名称和与主题相关的设置?(仅用于向 Kafka 发送消息 -> Produce)
@ApplicationScoped
public class PriceGenerator {
private Random random = new Random();
// Don't want to use this
// "generated-price" not known at build time
@Outgoing("generated-price")
public Multi<Integer> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
.onOverflow().drop()
.map(tick -> random.nextInt(100));
}
}
Run Code Online (Sandbox Code Playgroud)
或者这些配置应该在运行时以编程方式设置
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
Run Code Online (Sandbox Code Playgroud)
因为不认识路,所以使用了原生的Kafka驱动
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
Run Code Online (Sandbox Code Playgroud)
Properties props = new Properties();
props.put("bootstrap.servers", "85.93.89.115:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new …Run Code Online (Sandbox Code Playgroud) java apache-kafka quarkus smallrye-reactive-messaging smallrye
在服务器发送电子邮件后,我试图设置一个简单的成功/失败响应。
但是,即使在尝试了许多变体数小时后,我仍然没有得到正确的响应。
仅给出接受的响应的示例代码在这里:
@GET
@Path("/async")
public CompletionStage<Response> sendASimpleEmailAsync() {
return reactiveMailer.send(
Mail.withText("to@acme.org", "A reactive email from quarkus", "This is my body"))
.subscribeAsCompletionStage()
.thenApply(x -> Response.accepted().build());
}
Run Code Online (Sandbox Code Playgroud)
但是,当邮件没有成功发送时,我想在这里提供另一个回复。我试过的是这个(但这是一个没有成功的 Uni 演员表):
@GET
@Path("/async")
public Uni<Void> sendASimpleEmailAsync() {
final Mail mailToBeSent = Mail.withText("to@acme.org", "A reactive email from quarkus", "This is my body");
return (Uni<Void>) reactiveMailer.send(mailToBeSent)
.then( response -> {
if (response == null) {
return Response.accepted();
}
});
}
Run Code Online (Sandbox Code Playgroud)
控制台输出(由于密码错误而未发送邮件时):
[ERROR] Failed to execute goal io.quarkus:quarkus-maven-plugin:1.5.1.Final:dev (default-cli) …Run Code Online (Sandbox Code Playgroud) 我正在尝试学习在 Quarkus 框架上使用 ReactiveMongoClient。
我以 Uni> 的身份发送回复部分成功
@GET
@Path("/unpaginated")
public Uni<List<Staff>> unpaginatedStaffList() {
return staffService.getStaffResponse();
}
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试获取其他类(StaffResponse)的对象以包含用于分页的 Link 对象时,我没有获得任何 Staff 记录。(现在我已经硬编码了分页链接)
@GET
@Path("/paginated")
public StaffResponse paginatedStaffList() {
List<Link> links = LinkService.getLinks("/staff?page=2&limit=20", "next");
Uni<List<Staff>> staff = (Uni<List<Staff>>) staffService.getStaffResponse();
return new StaffResponse(links, staff);
}
Run Code Online (Sandbox Code Playgroud)
响应中的“工作人员”为空。
MongoClient 正在返回员工列表,看起来 Response 对象没有获取列表。尝试阅读 SmallRye Mutiny 文档 - 无法解决。
请帮忙。
我已将代码提交到:https://github.com/doepradhan/staffApi 和示例 json 数据文件(https://github.com/doepradhan/staffApi/blob/master/sample-staff-data.json)
非常感谢您的热心帮助。
我在Quarks 应用程序中使用 Smallrye Mutiniy 反应库,因为 Quarks 应用程序本身支持它。
我正在尝试为服务类编写单元测试。我不确定如何为返回Uni / Multi的方法编写单元测试。
一个方法返回Uni<String>
public Uni<String> hello(final String name) {
final String message = "Hello " + name;
return Uni.createFrom().item(message);
}
Run Code Online (Sandbox Code Playgroud)
上述方法的实现单元
@Test
void testHello() {
final Uni<String> casePass = hello("Ram");
// assertion passes and all good with this.
casePass.subscribe().with(message -> Assertions.assertEquals("Hello Ram", message));
final Uni<String> caseFail = hello("Ravan");
// It is expected to fail the assertion, and it does. But the test is not failing, instead aseertion …Run Code Online (Sandbox Code Playgroud) 目前我有一个 Quarkus 应用程序,它从 Kafka 主题消费并在另一个 Kafka 主题上生成。它使用 SmallRye 反应式消息传递。效果很好。由于外部更改,要生成的主题和要使用的主题将位于不同集群上的 Kafka 服务器上(并且不应该/不能组合在一个集群中)。
在应用程序配置(yaml)中我们设置Kafka服务器(broker):
kafka:
bootstrap:
servers: localhost:9092
Run Code Online (Sandbox Code Playgroud)
在这里添加服务器没有帮助,然后它尝试将数据传播到代理上,这不是我的意图。
是否可以连接到多个集群(也许为每个主题设置一个服务器)?在互联网上找不到任何相关内容,无论是Quarkus 文档还是SmallRye 文档。
quarkus ×6
smallrye-reactive-messaging ×6
java ×3
apache-kafka ×2
mutiny ×2
smallrye ×2
rest ×1
resteasy ×1