我有一个 Spring Flux 应用程序,在某些时候我需要在后台执行一些繁重的任务,调用者(HTTP 请求)不需要等到该任务完成。
如果没有反应器,我可能只会使用Async注释,在不同的线程上执行该方法。对于reactor,我不确定是否应该继续采用这种方法,或者是否已经有内置机制可以实现这一点。
例如,给定一个接受Resource对象的Controller:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
processor.run(r); // the caller should not wait for the resource to be processed
return repository.save(r);
}
Run Code Online (Sandbox Code Playgroud)
和一个处理器类:
@Async
void run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
Mono<String> result = webClient.get()
.retrieve()
.bodyToMono(String.class);
String response = result.block(); //block for now
}
Run Code Online (Sandbox Code Playgroud)
HTTP 调用方/create不需要等待run方法完成。
我有一个反应式管道来处理传入的请求。对于每个请求,我需要调用一个与业务相关的函数 ( doSomeRelevantProcessing)。
完成后,我需要通知一些外部服务发生了什么。管道的那部分不应增加总体响应时间。此外,通知此外部系统并不是业务关键:在管道的主要部分完成后给出快速响应比确保通知成功更重要。
据我所知,在后台运行某些内容而不减慢整个过程的唯一方法是直接在管道中订阅,从而实现“一劳永逸”的心态。
除了在 内订阅之外,还有其他好的选择吗flatmap?我有点担心如果通知外部服务花费的时间比原始处理时间长并且同时收到大量请求会发生什么情况。这是否会导致内存耗尽或整个进程阻塞?
fun runPipeline(incoming: Mono<Request>) = incoming
.flatMap { doSomeRelevantProcessing(it) } // this should not be delayed
.flatMap { doBackgroundJob(it) } // this can take a moment, but is not super critical
fun doSomeRelevantProcessing(request: Request) = Mono.just(request) // do some processing
fun doBackgroundJob(request: Request) = Mono.deferContextual { ctx: ContextView ->
val notification = "notification" // build an object from context
// this uses non-blocking HTTP (i.e. webclient), so it can …Run Code Online (Sandbox Code Playgroud) 我有一个方法
@Service
public class MyService {
public Mono<Integer> processData() {
... // very long reactive operation
}
}
Run Code Online (Sandbox Code Playgroud)
在正常的程序流程中,我通过 Kafka 事件异步调用此方法。
出于测试目的,我需要将该方法公开为 Web 服务,但该方法应公开为异步:仅返回 HTTP 代码202 ACCEPTED并继续在后台进行数据处理。
Mono#subscribe()仅仅调用控制器方法并返回可以吗(=它不会有任何不需要的副作用)吗?
@Service
public class MyService {
public Mono<Integer> processData() {
... // very long reactive operation
}
}
Run Code Online (Sandbox Code Playgroud)
还是这样做更好(这里我对 IntelliJ 的警告感到困惑,也许与https://youtrack.jetbrains.com/issue/IDEA-276018相同?):
@RestController
@RequiredArgsConstructor
public class MyController {
private final MyService service;
@GetMapping
public void processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
Run Code Online (Sandbox Code Playgroud)
或者其他解决方案?
如何实现内部事件总线以在 webflux spring 堆栈中执行异步操作?
我想要一个服务发出一个事件:
@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
override suspend fun deleteEntry(entryId: Long) {
entryRepository.deleteById(entryId)
publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
}
}
Run Code Online (Sandbox Code Playgroud)
发布者服务不知道的不同组件应该能够决定对该事件做出反应。
@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
// do stuff
}
}
Run Code Online (Sandbox Code Playgroud)
在 MVC 应用程序中,我将使用在处理程序 ( ) 上ApplicationEventPublisher发布事件 ( publishEvent) 和@EventListener+ 。@AsynconDeleteEntry
反应式堆栈中的等效项是什么?
我考虑的另一个选择是运行嵌入式消息服务,因为这应该意味着异步语义。但这对于一个简单的场景来说感觉是很大的开销。
我找到了这些SO线程
但他们不回答这种情况,因为他们假设发布者知道侦听器。但我需要松散耦合。
我还发现了这些春季问题 …
java ×2
spring ×2
asynchronous ×1
optimization ×1
spring-async ×1
spring-boot ×1
terminology ×1