相关疑难解决方法(0)

276
推荐指数
9
解决办法
16万
查看次数

Spring Flux 和 Async 注释

我有一个 Spring Flux 应用程序,在某些时候我需要在后台执行一些繁重的任务,调用者(HTTP 请求)不需要等到该任务完成。

如果没有反应器,我可能只会使用Async注释,在不同的线程上执行该方法。对于reactor,我不确定是否应该继续采用这种方法,或者是否已经有内置机制可以实现这一点。

例如,给定一个接受Resource对象的Controller

@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
    processor.run(r); // the caller should not wait for the resource to be processed
    return repository.save(r);
}
Run Code Online (Sandbox Code Playgroud)

和一个处理器类:

@Async
void run(Resource r) { 
    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> result = webClient.get()
                                   .retrieve()
                                   .bodyToMono(String.class);
    String response = result.block(); //block for now
}
Run Code Online (Sandbox Code Playgroud)

HTTP 调用方/create不需要等待run方法完成。

java spring project-reactor spring-async spring-webflux

7
推荐指数
2
解决办法
7079
查看次数

反应式管道中的后台任务(即发即忘)

我有一个反应式管道来处理传入的请求。对于每个请求,我需要调用一个与业务相关的函数 ( doSomeRelevantProcessing)。

完成后,我需要通知一些外部服务发生了什么。管道的那部分不应增加总体响应时间。此外,通知此外部系统并不是业务关键:在管道的主要部分完成后给出快速响应比确保通知成功更重要。

据我所知,在后台运行某些内容而不减慢整个过程的唯一方法是直接在管道中订阅,从而实现“一劳永逸”的心态。

除了在 内订阅之外,还有其他好的选择吗flatmap?我有点担心如果通知外部服务花费的时间比原始处理时间长并且同时收到大量请求会发生什么情况。这是否会导致内存耗尽或整个进程阻塞?

fun runPipeline(incoming: Mono<Request>) = incoming
    .flatMap { doSomeRelevantProcessing(it) } // this should not be delayed
    .flatMap { doBackgroundJob(it) } // this can take a moment, but is not super critical

fun doSomeRelevantProcessing(request: Request) = Mono.just(request) // do some processing

fun doBackgroundJob(request: Request) = Mono.deferContextual { ctx: ContextView ->
    val notification = "notification" // build an object from context

    // this uses non-blocking HTTP (i.e. webclient), so it can …
Run Code Online (Sandbox Code Playgroud)

reactive-programming project-reactor spring-webflux

5
推荐指数
1
解决办法
1439
查看次数

如何将 Mono 变成真正的异步(非反应式!)方法调用?

我有一个方法

@Service
public class MyService {
    public Mono<Integer> processData() {
        ... // very long reactive operation
    }
}
Run Code Online (Sandbox Code Playgroud)

在正常的程序流程中,我通过 Kafka 事件异步调用此方法。

出于测试目的,我需要将该方法公开为 Web 服务,但该方法应公开为异步:仅返回 HTTP 代码202 ACCEPTED并继续在后台进行数据处理。

Mono#subscribe()仅仅调用控制器方法并返回可以吗(=它不会有任何不需要的副作用)吗?

@Service
public class MyService {
    public Mono<Integer> processData() {
        ... // very long reactive operation
    }
}
Run Code Online (Sandbox Code Playgroud)

还是这样做更好(这里我对 IntelliJ 的警告感到困惑,也许与https://youtrack.jetbrains.com/issue/IDEA-276018相同?):

@RestController
@RequiredArgsConstructor
public class MyController {
    private final MyService service;

    @GetMapping
    public void processData() {
        service.processData()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    }
}
Run Code Online (Sandbox Code Playgroud)

或者其他解决方案?

java asynchronous project-reactor spring-webflux

5
推荐指数
1
解决办法
5373
查看次数

webflux:内部事件总线和异步、Loosley 耦合的事件侦听器

如何实现内部事件总线以在 webflux spring 堆栈中执行异步操作?

我想要一个服务发出一个事件:

@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
  }
}
Run Code Online (Sandbox Code Playgroud)

发布者服务不知道的不同组件应该能够决定对该事件做出反应。

@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
  override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
    // do stuff
  }
}
Run Code Online (Sandbox Code Playgroud)

在 MVC 应用程序中,我将使用在处理程序 ( ) 上ApplicationEventPublisher发布事件 ( publishEvent) 和@EventListener+ 。@AsynconDeleteEntry

反应式堆栈中的等效项是什么?

我考虑的另一个选择是运行嵌入式消息服务,因为这应该意味着异步语义。但这对于一个简单的场景来说感觉是很大的开销。


我找到了这些SO线程

但他们不回答这种情况,因为他们假设发布者知道侦听器。但我需要松散耦合。

我还发现了这些春季问题 …

spring spring-boot spring-webflux

3
推荐指数
1
解决办法
2438
查看次数