标签: mutiny

如何在 Quarkus 上测试 Mutiny 中的轮询模式?

我想测试https://smallrye.io/smallrye-mutiny/guides/polling中的一个简单的轮询示例,并将服务的数据轮询到 Kafka 流中。

这是我要测试的类的简化示例:

@ApplicationScoped
public class ExampleScheduler {

    @Inject
    @RestClient
    ExapleService service;


    @PostConstruct
    void init() {
        pollSource();
    }

    @Outgoing("sensor_data_out")
    Multi<String> pollSource() {
        Uni<String> stream = service.getString()
                .runSubscriptionOn(Infrastructure.getDefaultExecutor());

        return stream.repeat().withDelay(Duration.ofSeconds(3))
                .indefinitely();
    }
}
Run Code Online (Sandbox Code Playgroud)

这是测试类:

@QuarkusTest
class ExampleSchedulerTest {
    
    @Inject
    ExampleScheduler classToTest;

    @InjectMock
    ExampleService mockService;

    @BeforeEach
    void setUp() {
        when(mockService.getString()).thenReturn(Uni.createFrom().item("ANSWER"));
    }

    @Test
    void pollSource() {
        final Multi<String> stream = classToTest.pollSource();
        AssertSubscriber<String> subscriber = stream.subscribe().withSubscriber(AssertSubscriber.create(1));
        subscriber.assertCompleted()
                .assertItems("ANSWER");
    }
}
Run Code Online (Sandbox Code Playgroud)

我的实际例子的错误日志是:

我试图依靠 Quarkus 测试容器来提供 Kafka 实例

java.lang.AssertionError: No completion (or failure) …
Run Code Online (Sandbox Code Playgroud)

unit-testing vert.x apache-kafka quarkus mutiny

6
推荐指数
1
解决办法
1190
查看次数

Uni&lt;Void&gt; 如何在 REST 调用中获得失败或成功响应

在服务器发送电子邮件后,我试图设置一个简单的成功/失败响应。

但是,即使在尝试了许多变体数小时后,我仍然没有得到正确的响应。

仅给出接受响应示例代码在这里:

@GET
@Path("/async")
public CompletionStage<Response> sendASimpleEmailAsync() {
    return reactiveMailer.send(
            Mail.withText("to@acme.org", "A reactive email from quarkus", "This is my body"))
            .subscribeAsCompletionStage()
            .thenApply(x -> Response.accepted().build());
}
Run Code Online (Sandbox Code Playgroud)

但是,当邮件没有成功发送时,我想在这里提供另一个回复。我试过的是这个(但这是一个没有成功的 Uni 演员表):

@GET
@Path("/async")
public Uni<Void> sendASimpleEmailAsync() {
    final Mail mailToBeSent =  Mail.withText("to@acme.org", "A reactive email from quarkus", "This is my body");

    return (Uni<Void>) reactiveMailer.send(mailToBeSent)
            .then( response -> {
                if (response == null) {
                    return Response.accepted();
                }
            });
}
Run Code Online (Sandbox Code Playgroud)

控制台输出(由于密码错误而未发送邮件时):

[ERROR] Failed to execute goal io.quarkus:quarkus-maven-plugin:1.5.1.Final:dev (default-cli) …
Run Code Online (Sandbox Code Playgroud)

java resteasy quarkus smallrye-reactive-messaging mutiny

5
推荐指数
1
解决办法
1339
查看次数

两个 Quarkus 服务之间的非阻塞数据流(Vert.x 和 Java 中的 Mutiny)

更新!

在解决了一些与主要问题无关的问题后,我修复了示例代码中的小错误,主要问题仍然是关于服务之间的非阻塞流。

背景资料

我正在 Quarkus 下移植 Spring WebFlux 服务。该服务对多个庞大数据集运行长时间搜索,并在可用时以 Flux(文本/事件流)形式返回部分结果。

问题

现在,我尝试在 Quarkus 下将 Mutiny Multi 与 Vert.x 结合使用,但无法弄清楚消费者服务如何在不阻塞的情况下接收此流。

在所有示例中,消费者要么是 JS 前端页面,要么生产者的内容类型是 application/json,在将其发送到一个 JSON 对象之前,它似乎会一直阻塞,直到 Multi 完成(这在我的应用程序中没有任何意义)。

问题

  1. 如何使用 Mutiny 风格的 Vert.x WebClient 接收文本/事件流?
  2. 如果问题是 WebClient 无法接收连续流:在两个 Quarkus 服务之间传输数据的标准方法是什么?

这是一个简化的例子

测试实体

public class SearchResult implements Serializable {

    private String content;

    public SearchResult(String content) {
        this.content = content;
    }


    //.. toString, getters and setters
    
}
Run Code Online (Sandbox Code Playgroud)

生产者 1. 简单的无限流 -> 挂起

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() { …
Run Code Online (Sandbox Code Playgroud)

java event-stream quarkus vert.x-webclient mutiny

5
推荐指数
1
解决办法
1798
查看次数

Quarkus Reactive - 名称“security.jaxrs.deny-unannotated-endpoints”的多个匹配属性错误

使用 Quarkus 在执行时出现以下错误:

引起原因:java.lang.IllegalArgumentException:名称“security.jaxrs.deny-unannotated-endpoints”属性的多个匹配属性与公共布尔值io.quarkus.resteasy.reactive.common.runtime.JaxRsSecurityConfig.denyJaxRs和公共布尔值相匹配io.quarkus.resteasy.runtime.JaxRsSecurityConfig.denyJaxRs。这可能是因为您有一个不兼容的扩展组合,它们都定义了相同的属性(例如,包括反应式和阻塞数据库扩展)

我的 pom 属性是:

<compiler-plugin.version>3.8.1</compiler-plugin.version>
<maven.compiler.parameters>true</maven.compiler.parameters>
<maven.compiler.source>12</maven.compiler.source>
<maven.compiler.target>12</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus-plugin.version>1.13.3.Final</quarkus-plugin.version>
<quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
<quarkus.platform.version>1.13.3.Final</quarkus.platform.version>
Run Code Online (Sandbox Code Playgroud)

和依赖项:

  <dependencies>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-mutiny</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-vertx</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-jsonb</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-mutiny</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-resteasy-reactive</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-rest-client-reactive</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-jwt</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-jwt-build</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-jdbc-postgresql</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-arc</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-junit5</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.rest-assured</groupId>
      <artifactId>rest-assured</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
Run Code Online (Sandbox Code Playgroud)

我只是尝试使用 Multi from mutiny 和Server Sent Elements进行流式传输:

@GET …
Run Code Online (Sandbox Code Playgroud)

java resteasy quarkus mutiny

4
推荐指数
1
解决办法
2049
查看次数

Uni.combine().all().unis() 与 Multi..onItem().transformToMultiAndConcatenate().collect()

在我的 Quarkus 服务中,我需要从外部服务获取结果列表,我发现有两种方法可以实现相同的目标:

第一种方法基于Uni.combine().all()

List<Uni<Result>> results = new ArrayList();
for (Parameter p : parameters) {
  // callService returns Uni<Result>
  results.add(callService(p));
}
// collect all the results
Uni<List<Result>> combined = Uni.combine().all().unis(results)...
Run Code Online (Sandbox Code Playgroud)

第二种方法基于Multi..onItem().transformToMultiAndConcatenate().collect()

Multi.createFrom().iterable(parameters)
.onItem()
.transformToMultiAndConcatenate(p -> callService(p))
.collect().asList()
Run Code Online (Sandbox Code Playgroud)

一开始,我认为这两种方法之间不存在任何真正的区别,因为它们Uni是惰性评估的,或者Uni.combineMulti.collect我来说就像语法糖。但我还是想问一下,有什么区别吗?尤其是性能方面的差异。

使用第一种方法,我正在调试一个错误,当 的大小parameters超过 25 时,它开始给出错误,但在 25 以下则很好。因此,我怀疑第一种方法会产生非常高的 QPS,从而淹没外部服务。但是,我怀疑第二种方法是否有助于限制。

java reactive-programming microservices quarkus mutiny

3
推荐指数
1
解决办法
7462
查看次数

如何构建一个异步休息端点,在工作线程中调用阻塞操作并立即回复(Quarkus)

我检查了文档和 stackoverflow,但没有找到合适的方法。例如,这篇文章看起来非常接近:使用 Quarkus/Mutiny 在 Reactive REST GET 端点中调度阻塞服务 但是,我不想在我的服务中出现太多不必要的样板代码,最好的情况是根本不需要更改服务代码。

我通常只想调用一个使用实体管理器的服务方法,因此是一个阻塞操作,但是,想要立即向调用者返回一个字符串,例如“查询开始”或其他内容。我不需要回调对象,这只是一种即发即忘的方法。

我尝试过这样的事情

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
    return Uni.createFrom()
    .item("query started")
    .call(() -> service.startLongRunningQuery());
}
Run Code Online (Sandbox Code Playgroud)

但它不起作用 - >错误消息返回给调用者:

You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.",
Run Code Online (Sandbox Code Playgroud)

我实际上希望 quarkus …

resteasy quarkus mutiny

3
推荐指数
1
解决办法
2729
查看次数

如何对返回 Smallrye 叛变反应库的 Uni/Multi 的方法进行单元测试?

我在Quarks 应用程序中使用 Smallrye Mutiniy 反应库,因为 Quarks 应用程序本身支持它。

我正在尝试为服务类编写单元测试。我不确定如何为返回Uni / Multi的方法编写单元测试。

一个方法返回Uni<String>

public Uni<String> hello(final String name) {
    final String message = "Hello " + name;
    return Uni.createFrom().item(message);
}
Run Code Online (Sandbox Code Playgroud)

上述方法的实现单元

@Test
void testHello() {
    final Uni<String> casePass = hello("Ram");
    // assertion passes and all good with this.
    casePass.subscribe().with(message -> Assertions.assertEquals("Hello Ram", message));
    
    final Uni<String> caseFail = hello("Ravan");
    //  It is expected to fail the assertion, and it does. But the test is not failing, instead aseertion …
Run Code Online (Sandbox Code Playgroud)

quarkus smallrye-reactive-messaging smallrye mutiny

2
推荐指数
1
解决办法
5442
查看次数

Mutiny Uni 转换为原始类型

到目前为止,我已经在 Quarkus 中使用 Smallrye Mutiny 做了非常基本的事情。基本上,我有一两个非常小的 Web 服务,它们仅与 Web 应用程序交互。这些服务返回一个Uni<Response>.

现在我正在编写一个日志服务,我希望其他人向其传递信息。在此日志记录服务中,我需要返回一个值来调用服务。日志记录服务将返回该值作为Uni<Integer>. 我正在努力解决的是如何将调用服务中的返回值提取为int.

这是日志服务中的函数

    @GET
    @Path("/requestid")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<Integer> getMaxRequestId(){
        return service.getMaxRequestId();
    }

    public Uni<Integer> getMaxRequestId() {
        Integer result = Integer.valueOf(em.createQuery("select MAX(request_id) from service_requests").getFirstResult());
        
        if(result == null) {
            result = 0;
        }
        return Uni.createFrom().item(result += 1);
    }

Run Code Online (Sandbox Code Playgroud)

这是调用服务中的客户端代码

@Path("/requests")
public class RequestIdResource {
    
    @RestClient
    RequestIdServices service;
    
    @GET
    @Path("/requestid")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<Integer> getMaxRequestId(){
        return service.getMaxRequestId();
    }
}

    public void filter(ContainerRequestContext requestContext) throws IOException {

        int requestid …
Run Code Online (Sandbox Code Playgroud)

quarkus mutiny

2
推荐指数
1
解决办法
5238
查看次数

quarkus:反应性叛乱返回对象 Uni 引用而不是字符串值

当我到达终点时,我将获得此资源:

$ curl http://localhost:8080/hello
io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem@255ef91
Run Code Online (Sandbox Code Playgroud)

这是我的代码:

$ curl http://localhost:8080/hello
io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem@255ef91
Run Code Online (Sandbox Code Playgroud)

这些是我的 quarkus 项目依赖项:

@Path("/hello")
@AllArgsConstructor
public class GreetingResource {

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> hello() {
        return Uni.createFrom().item("item1");
    }
}
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?

quarkus mutiny

1
推荐指数
1
解决办法
1431
查看次数