标签: reactive

了解Spring的Web反应框架

我目前正在nettyjOOQ上使用SpringBoot 2,spring-boot-starter-webflux开发一个应用程序. 下面是我经过数小时的研究和stackoverflow搜索后得出的代码.我已经内置了很多日志记录,以便查看在哪个线程上发生了什么.

UserController的:

@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
    return Mono.just(user)
            .map(it -> {
                logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
                return it;
            })
            .map(userService::create)
            .map(it -> {
                logger.debug("Sending response on thread: " + Thread.currentThread().getName());
                return ResponseEntity.status(HttpStatus.CREATED).body(it);
            })
            .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
Run Code Online (Sandbox Code Playgroud)

UserService:

public int create(ImUser user) {
    return Mono.just(user)
            .subscribeOn(Schedulers.elastic())
            .map(u -> {
                logger.debug("UserService thread: " + Thread.currentThread().getName());
                return imUserDao.insertUser(u);
            }) …
Run Code Online (Sandbox Code Playgroud)

java spring-boot project-reactor reactive

7
推荐指数
1
解决办法
2689
查看次数

Spring Web Reactive Framework多部分文件问题

我试图通过尝试以下方法使用Spring的Reactive Framework实现和映像上传:

@RestController
@RequestMapping("/images")
public class ImageController {

    @Autowired
    private IImageService imageService;

    @PostMapping(value = "", consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    Mono<ImageEntity> saveImage(@RequestBody Mono<FilePart> part) throws Exception{
         return part.flatMap(file -> imageService.saveImage(file));
    }
}
Run Code Online (Sandbox Code Playgroud)

但我一直收到415,出现以下错误信息:

Response status 415 with reason "Content type 'multipart/form-data;boundary=--0b227e57d1a5ca41' not supported\
Run Code Online (Sandbox Code Playgroud)

不确定是什么问题,我正在通过以下方式卷曲API:

 curl -v -F "file=@jinyang.gif" -H "Content-Type: multipart/form-data" localhost:8080/images
Run Code Online (Sandbox Code Playgroud)

我尝试过不同的标题和文件变体,结果相同.这里有点不知所措,因为我过去做过这件事情似乎工作正常.我在这篇文章中看到这个功能被合并了:

如何启用Spring Reactive Web MVC来处理Multipart文件?

spring spring-mvc project-reactor reactive

7
推荐指数
1
解决办法
3512
查看次数

使用 MySql 启动响应式 Spring (Webflux)

我将开发一个符合新功能 Webflux 的 Spring 启动应用程序。是否存在用于 Mysql 的驱动程序 JDBC?

我找到了一些 NoSql DB 的驱动程序(例如 MongoDB)。

你可以帮帮我吗?感谢和问候。

更新:

在 R2DBC 项目的官方网站上有驱动程序列表:https ://r2dbc.io/

java spring spring-boot reactive spring-webflux

7
推荐指数
2
解决办法
1万
查看次数

为 dart 中的每次发射生成带有延迟的流

我想从 dart 中具有特定延迟的列表中发出流中的值。

因此,从 [1,2,3](这是一个常规列表)中,我想像在单独的事件中一样发出值1...2...3

我尝试过这样的事情

List<int> myList = [1,2,3];
Subject _current = BehaviorSubject<int>();
Stream<int> get current$ => _current.stream.delay(Duration(seconds:1));

myList.forEach(current.add);
Run Code Online (Sandbox Code Playgroud)

但我得到...123的是,所以这将整个流延迟 1 秒,而不是列表中的每个值。

有任何想法吗?谢谢

dart reactive rxdart

7
推荐指数
2
解决办法
4365
查看次数

当控制器返回 EmitterProcessor 而不是 Flux 时,WebTestClient 在单元测试中阻塞

我正在开发一个使用 Spring WebFlux 堆栈的项目。我们有一个控制器,您可以在其中订阅特定对象的更新。此控制器返回EmitterProcessor客户端可以订阅的位置。当某些内容在 上发布时EmitterProcessor,订阅的客户端会收到通知。

这在实践中效果很好,但我的单元测试失败了。单元测试使用WebTestClient,它在exchange()操作返回 a时阻塞EmitterProcessor(也尝试了其他FluxProcessor实现,例如UnicastProcessor)。我得到的错误如下:

java.lang.IllegalStateException:5000 毫秒的阻塞读取超时

at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:117)
at reactor.core.publisher.Mono.block(Mono.java:1524)
at org.springframework.test.web.reactive.server.DefaultWebTestClient$DefaultRequestBodyUriSpec.exchange(DefaultWebTestClient.java:283)
Run Code Online (Sandbox Code Playgroud)

我找到了这个线程,它也报告exchange()WebTestClient块上的方法,但正如那里所解释的那样,它只是为了检索状态和标题而阻塞,所以这应该不是问题。此外,在Flux返回a 的情况下,这可以正常工作,如引用的testcase 所示

测试用例

我创建了一个从引用的测试用例派生的简单测试用例,并将其调整为Flux在一种情况下返回 a ,EmitterProcessor在另一种情况下返回an (失败)。您可能会注意到, 的断言EmitterProcessor应该失败,但由于阻塞exchange()调用,它永远不会到达那里。

另请注意,当我取消注释该行时processor.onNext("hello");,将返回此内容并且测试用例成功。

at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:117)
at reactor.core.publisher.Mono.block(Mono.java:1524)
at org.springframework.test.web.reactive.server.DefaultWebTestClient$DefaultRequestBodyUriSpec.exchange(DefaultWebTestClient.java:283)
Run Code Online (Sandbox Code Playgroud)

spring unit-testing reactive spring-webflux

7
推荐指数
1
解决办法
1876
查看次数

如何在 MongoClientSettings 中设置每个主机的连接设置?

阻止 MongoDB 驱动程序具有MongoClientOptions,其中包含客户端设置。
反应式驱动程序MongoClients.create()期望MongoClientSettings作为参数。
大多数设置MongoClientOptionsMongoClientSettings.

connectionsPerHost但我在 中找不到替代方案MongoClientSettings
我错过了什么吗?

java mongodb reactive

7
推荐指数
1
解决办法
3201
查看次数

项目反应堆:仅当第一项未发出时才出现通量超时

我有一个通量,应该几乎立即发出一个项目。此后,它可能会在很长一段时间内不再发射物品。如果最初没有收到任何项目,我希望它超时。但如果我使用该timeout(Duration)方法,每次在给定时间内没有收到任何项目时,它都会超时。

我现在的代码由于上述原因而不起作用:

messageFlux.timeout(Duration.ofSeconds(30)).doOnError(e -> {
    // handle error
}).subscribe(m -> messageService.consumeMessage(m));
Run Code Online (Sandbox Code Playgroud)

有没有一种方法可以有效地做到这一点?

java flux reactive

7
推荐指数
1
解决办法
1124
查看次数

Vue3 中的 watch() 会在组件卸载时自动停止吗?

根据Vue3 DocwatchEffect卸载组件时将停止。

当在组件的 setup() 函数或生命周期挂钩期间调用 watchEffect 时,观察器将链接到组件的生命周期,并在组件卸载时自动停止。

watch在自动停止方面是否有相同的行为?谢谢!

javascript watch reactive vuejs3

7
推荐指数
1
解决办法
6547
查看次数

R Shiny - 如何在更新依赖反应图之前更新依赖反应 selectInput

应用程序结构

我有一个闪亮的应用程序,具有典型的侧边栏面板+主面板结构。

  • 侧边栏面板:侧边栏面板中有多个 selectInput 小部件,其中每个 selectInput 中的选择取决于前一个 selectInput 的选定值。(即,用户从 selectInput 1 中选择一个数据集,从 selectInput 2 中选择一个变量,其中 selectInput #2 中可用作“选项”的变量取决于输入 1 的选择)
  • 主面板:有一个基本的 ggplot2 可视化,它取决于侧边栏面板中所做的 2 个输入选择(数据集和变量)。

问题

当用户在 selectInput #1 中选择新数据集时,selectInput #2(可用变量)和绘图都需要更新。我希望首先更新 selectInput #2,然后更新绘图。然而,似乎情节总是在第二个 selectInput 有机会更新之前继续更新。这会导致绘图尝试渲染无效绘图 - 即尝试使用 iris 数据集渲染 mtcars 变量的绘图,反之亦然。

有没有办法优先考虑 selectInput #2 的反应式更新发生在renderPlot 的反应式更新之前?

笔记

  • 作为用户体验要求,我避免使用按钮来渲染绘图。我需要绘图根据选择实时动态更新。
  • 在我的 reprex 中,我包含了打印语句来描述绘图如何尝试使用无效的选择组合进行更新。
library(shiny)
library(ggplot2)
library(dplyr)

# Define UI for application that draws a histogram
ui <- fluidPage(

    titlePanel("Reactivity Test"),

    # Sidebar with two input widgets
    sidebarLayout(
        sidebarPanel(
            selectInput(inputId …
Run Code Online (Sandbox Code Playgroud)

r shiny reactive shiny-reactivity shinyapps

7
推荐指数
1
解决办法
1718
查看次数

使用 Project Reactor 管理对共享资源的访问

如何使用 Project Reactor 管理对共享资源的访问?

给定一个虚构的关键组件,该组件当时只能执行操作(文件存储、昂贵的远程服务等),如果存在对该组件的多个访问点(多个 API 方法、订户...)?如果资源可以自由执行该操作,则应该立即执行它,如果其他操作已经在进行中,则将我的操作添加到队列中,并在我的操作完成后完成我的 Mono。

我的想法是将任务添加到flux队列中,该队列一个接一个地执行任务,并返回一个Mono,一旦队列中的任务完成,该Mono就会完成,而不会阻塞。

class CriticalResource {

    private final Sinks.Many<Mono<?>> taskExecutor = Sinks.many()
                                                          .unicast()
                                                          .onBackpressureBuffer();

    private final Disposable taskExecutorDisposable =  taskExecutor.asFlux()
                                                                      .concatMap(Function.identity()) //this executes actions in sequential order
                                                                      .subscribe();


    public Mono<Void> resourceOperation1() {
        doSomething();
        .as(this::sequential);
    }


    public Mono<Void> resourceOperation2() {
        doSomethingElse();
        .as(this::sequential);
    }


    public Mono<Void> resourceOperation3() {
        doSomething();
        .then(somethingElse())
        .as(this::sequential);
    }

    private <T> Mono<T> sequential(Mono<T> action) {
        return Mono.defer(() -> {
            Sinks.One<T> actionResult = Sinks.one(); //create a new mono which should complete when our action …
Run Code Online (Sandbox Code Playgroud)

concurrency project-reactor reactive

7
推荐指数
1
解决办法
345
查看次数