我目前正在netty和jOOQ上使用SpringBoot 2,spring-boot-starter-webflux开发一个应用程序.
下面是我经过数小时的研究和stackoverflow搜索后得出的代码.我已经内置了很多日志记录,以便查看在哪个线程上发生了什么.
UserController的:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
return Mono.just(user)
.map(it -> {
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return it;
})
.map(userService::create)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
Run Code Online (Sandbox Code Playgroud)
UserService:
public int create(ImUser user) {
return Mono.just(user)
.subscribeOn(Schedulers.elastic())
.map(u -> {
logger.debug("UserService thread: " + Thread.currentThread().getName());
return imUserDao.insertUser(u);
}) …
Run Code Online (Sandbox Code Playgroud) 我试图通过尝试以下方法使用Spring的Reactive Framework实现和映像上传:
@RestController
@RequestMapping("/images")
public class ImageController {
@Autowired
private IImageService imageService;
@PostMapping(value = "", consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
Mono<ImageEntity> saveImage(@RequestBody Mono<FilePart> part) throws Exception{
return part.flatMap(file -> imageService.saveImage(file));
}
}
Run Code Online (Sandbox Code Playgroud)
但我一直收到415,出现以下错误信息:
Response status 415 with reason "Content type 'multipart/form-data;boundary=--0b227e57d1a5ca41' not supported\
Run Code Online (Sandbox Code Playgroud)
不确定是什么问题,我正在通过以下方式卷曲API:
curl -v -F "file=@jinyang.gif" -H "Content-Type: multipart/form-data" localhost:8080/images
Run Code Online (Sandbox Code Playgroud)
我尝试过不同的标题和文件变体,结果相同.这里有点不知所措,因为我过去做过这件事情似乎工作正常.我在这篇文章中看到这个功能被合并了:
我将开发一个符合新功能 Webflux 的 Spring 启动应用程序。是否存在用于 Mysql 的驱动程序 JDBC?
我找到了一些 NoSql DB 的驱动程序(例如 MongoDB)。
你可以帮帮我吗?感谢和问候。
更新:
在 R2DBC 项目的官方网站上有驱动程序列表:https ://r2dbc.io/
我想从 dart 中具有特定延迟的列表中发出流中的值。
因此,从 [1,2,3](这是一个常规列表)中,我想像在单独的事件中一样发出值1...2...3
。
我尝试过这样的事情
List<int> myList = [1,2,3];
Subject _current = BehaviorSubject<int>();
Stream<int> get current$ => _current.stream.delay(Duration(seconds:1));
myList.forEach(current.add);
Run Code Online (Sandbox Code Playgroud)
但我得到...123
的是,所以这将整个流延迟 1 秒,而不是列表中的每个值。
有任何想法吗?谢谢
我正在开发一个使用 Spring WebFlux 堆栈的项目。我们有一个控制器,您可以在其中订阅特定对象的更新。此控制器返回EmitterProcessor
客户端可以订阅的位置。当某些内容在 上发布时EmitterProcessor
,订阅的客户端会收到通知。
这在实践中效果很好,但我的单元测试失败了。单元测试使用WebTestClient,它在exchange()
操作返回 a时阻塞EmitterProcessor
(也尝试了其他FluxProcessor
实现,例如UnicastProcessor
)。我得到的错误如下:
java.lang.IllegalStateException:5000 毫秒的阻塞读取超时
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:117)
at reactor.core.publisher.Mono.block(Mono.java:1524)
at org.springframework.test.web.reactive.server.DefaultWebTestClient$DefaultRequestBodyUriSpec.exchange(DefaultWebTestClient.java:283)
Run Code Online (Sandbox Code Playgroud)
我找到了这个线程,它也报告exchange()
了WebTestClient
块上的方法,但正如那里所解释的那样,它只是为了检索状态和标题而阻塞,所以这应该不是问题。此外,在Flux
返回a 的情况下,这可以正常工作,如引用的testcase 所示。
我创建了一个从引用的测试用例派生的简单测试用例,并将其调整为Flux
在一种情况下返回 a ,EmitterProcessor
在另一种情况下返回an (失败)。您可能会注意到, 的断言EmitterProcessor
应该失败,但由于阻塞exchange()
调用,它永远不会到达那里。
另请注意,当我取消注释该行时processor.onNext("hello");
,将返回此内容并且测试用例成功。
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:117)
at reactor.core.publisher.Mono.block(Mono.java:1524)
at org.springframework.test.web.reactive.server.DefaultWebTestClient$DefaultRequestBodyUriSpec.exchange(DefaultWebTestClient.java:283)
Run Code Online (Sandbox Code Playgroud) 阻止 MongoDB 驱动程序具有MongoClientOptions
,其中包含客户端设置。
反应式驱动程序MongoClients.create()
期望MongoClientSettings
作为参数。
大多数设置MongoClientOptions
在MongoClientSettings
.
connectionsPerHost
但我在 中找不到替代方案MongoClientSettings
。
我错过了什么吗?
我有一个通量,应该几乎立即发出一个项目。此后,它可能会在很长一段时间内不再发射物品。如果最初没有收到任何项目,我希望它超时。但如果我使用该timeout(Duration)
方法,每次在给定时间内没有收到任何项目时,它都会超时。
我现在的代码由于上述原因而不起作用:
messageFlux.timeout(Duration.ofSeconds(30)).doOnError(e -> {
// handle error
}).subscribe(m -> messageService.consumeMessage(m));
Run Code Online (Sandbox Code Playgroud)
有没有一种方法可以有效地做到这一点?
根据Vue3 Doc,watchEffect
卸载组件时将停止。
当在组件的 setup() 函数或生命周期挂钩期间调用 watchEffect 时,观察器将链接到组件的生命周期,并在组件卸载时自动停止。
watch
在自动停止方面是否有相同的行为?谢谢!
我有一个闪亮的应用程序,具有典型的侧边栏面板+主面板结构。
当用户在 selectInput #1 中选择新数据集时,selectInput #2(可用变量)和绘图都需要更新。我希望首先更新 selectInput #2,然后更新绘图。然而,似乎情节总是在第二个 selectInput 有机会更新之前继续更新。这会导致绘图尝试渲染无效绘图 - 即尝试使用 iris 数据集渲染 mtcars 变量的绘图,反之亦然。
有没有办法优先考虑 selectInput #2 的反应式更新发生在renderPlot 的反应式更新之前?
library(shiny)
library(ggplot2)
library(dplyr)
# Define UI for application that draws a histogram
ui <- fluidPage(
titlePanel("Reactivity Test"),
# Sidebar with two input widgets
sidebarLayout(
sidebarPanel(
selectInput(inputId …
Run Code Online (Sandbox Code Playgroud) 如何使用 Project Reactor 管理对共享资源的访问?
给定一个虚构的关键组件,该组件当时只能执行操作(文件存储、昂贵的远程服务等),如果存在对该组件的多个访问点(多个 API 方法、订户...)?如果资源可以自由执行该操作,则应该立即执行它,如果其他操作已经在进行中,则将我的操作添加到队列中,并在我的操作完成后完成我的 Mono。
我的想法是将任务添加到flux队列中,该队列一个接一个地执行任务,并返回一个Mono,一旦队列中的任务完成,该Mono就会完成,而不会阻塞。
class CriticalResource {
private final Sinks.Many<Mono<?>> taskExecutor = Sinks.many()
.unicast()
.onBackpressureBuffer();
private final Disposable taskExecutorDisposable = taskExecutor.asFlux()
.concatMap(Function.identity()) //this executes actions in sequential order
.subscribe();
public Mono<Void> resourceOperation1() {
doSomething();
.as(this::sequential);
}
public Mono<Void> resourceOperation2() {
doSomethingElse();
.as(this::sequential);
}
public Mono<Void> resourceOperation3() {
doSomething();
.then(somethingElse())
.as(this::sequential);
}
private <T> Mono<T> sequential(Mono<T> action) {
return Mono.defer(() -> {
Sinks.One<T> actionResult = Sinks.one(); //create a new mono which should complete when our action …
Run Code Online (Sandbox Code Playgroud) reactive ×10
java ×4
spring ×3
spring-boot ×2
concurrency ×1
dart ×1
flux ×1
javascript ×1
mongodb ×1
r ×1
rxdart ×1
shiny ×1
shinyapps ×1
spring-mvc ×1
unit-testing ×1
vuejs3 ×1
watch ×1