webflux:内部事件总线和异步、Loosley 耦合的事件侦听器

Stu*_*uck 3 spring spring-boot spring-webflux

如何实现内部事件总线以在 webflux spring 堆栈中执行异步操作?

我想要一个服务发出一个事件:

@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
  }
}
Run Code Online (Sandbox Code Playgroud)

发布者服务不知道的不同组件应该能够决定对该事件做出反应。

@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
  override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
    // do stuff
  }
}
Run Code Online (Sandbox Code Playgroud)

在 MVC 应用程序中,我将使用在处理程序 ( ) 上ApplicationEventPublisher发布事件 ( publishEvent) 和@EventListener+ 。@AsynconDeleteEntry

反应式堆栈中的等效项是什么?

我考虑的另一个选择是运行嵌入式消息服务,因为这应该意味着异步语义。但这对于一个简单的场景来说感觉是很大的开销。


我找到了这些SO线程

但他们不回答这种情况,因为他们假设发布者知道侦听器。但我需要松散耦合。

我还发现了这些春季问题

特别是看到这个评论承诺建议这一点:

Mono.fromRunnable(() -> context.publishEvent(...))
Run Code Online (Sandbox Code Playgroud)

据我了解,我可以直接使用,@EventListener因为我完全可以不传播反应性上下文。

但是请有人解释一下线程限制的含义以及这在反应式堆栈中是否合法?


更新

从测试来看,这样做感觉很好:

@Service
class FeedServiceImpl(
  val applicationEventPublisher: ApplicationEventPublisher,
) : FeedService {
  @EventListener
  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler started")
    runBlocking {
      // do stuff that takes some time
      delay(1000)
    }
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    applicationEventPublisher.publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }
}
Run Code Online (Sandbox Code Playgroud)

请注意,这handle不是一个挂起函数,因为@EventListener必须有一个参数,并且协程在幕后引入了延续参数。然后,处理程序启动一个新的阻塞协程作用域,这很好,因为由于@Async.

输出是:

2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl  : ThreadId: 38
2021-05-13 12:15:20.755  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler started
2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl   : Publisher done
2021-05-13 12:15:21.758  INFO 20252 --- [         task-1] ...FeedServiceImpl   : ThreadId: 54
2021-05-13 12:15:21.759  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler done
Run Code Online (Sandbox Code Playgroud)

更新2

另一种不使用 @Async 的方法是:

  @EventListener
//  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler start")
    log.info("Handler ThreadId: ${Thread.currentThread().id}")
    runBlocking {
      log.info("Handler block start")
      delay(1000)
      log.info("Handler block ThreadId: ${Thread.currentThread().id}")
      log.info("Handler block end")
    }
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    feedRepository.deleteById(entryId)
    Mono.fromRunnable<Unit> {
      applicationEventPublisher.publishEvent(
        FeedEntryDeletedEvent(
          timestamp = time.utcMillis(),
          entryId = entryId,
        )
      )
    }
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe()
    log.info("Publisher ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }

Run Code Online (Sandbox Code Playgroud)
2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher ThreadId: 38
2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher done
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler start
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler ThreadId: 53
2021-05-13 13:06:54.505  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block start
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block ThreadId: 53
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block end
2021-05-13 13:06:55.540  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler done
Run Code Online (Sandbox Code Playgroud)

但是,我仍然不明白对负载下的应用程序的影响,并且将反应性操作与执行此操作的处理程序混合在一起感觉是错误的runBlocking { }

Eru*_*aro 8


Reactor 提供 Sink。您可以像事件总线一样使用它。看看下面的例子。

@Configuration
public class EventNotificationConfig {

    @Bean
    public Sinks.Many<EventNotification> eventNotifications() {
        return Sinks.many().replay().latest();
    }

}
Run Code Online (Sandbox Code Playgroud)

您在类中创建一个 Sink Bean @Configuration。这可以用来发出新事件,并且可以将其转换为订阅者的 Flux。

@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationUsecase {

    private final @NonNull Sinks.Many<EventNotification> eventNotifications;


    /**
     * Provide a flux with our notifications.
     *
     * @return a Flux
     */
    public Flux<EventNotification> getNotifications() {
        return eventNotifications.asFlux();
    }

    /**
     * Emit a new event to the sink.
     *
     * @param eventId
     * @param status
     * @param payload
     */
    public void emitNotification(final String eventId, final EventNotification.Status status, final Map<String, Object> payload) {
        eventNotifications.tryEmitNext(EventNotification.builder()
          .eventId(eventId)
          .status(status)
          .payload(payload).build());
    }

}

Run Code Online (Sandbox Code Playgroud)

您的应用程序中最多可以保留一个 Sink 实例。订阅不同类型的事件可以通过各种订阅者可以应用于 Flux 的过滤器来实现。


@Component
@RequiredArgsConstructor
@Slf4j
public class EventListener {

    private final @NonNull NotificationUsecase notificationUsecase;


    /**
     * Start listening to events as soon as class EventListener
     * has been constructed.
     *
     * Listening will continue until the Flux emits a 'completed'
     * signal.
     */
    @PostConstruct
    public void init() {

        this.listenToPings()
                .subscribe();
        this.listenToDataFetched()
                .subscribe();
    }


    public Flux<EventNotification> listenToPings() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.PING))
                .doOnNext(notification -> log.info("received PING: {}", notification));
    }

    public Flux<EventNotification> listenToDataFetched() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.DATA_FETCHED))
                .doOnNext(notification -> log.info("received data: {}", notification));
    }
}

    
    public Flux<EventNotification> listenToDataFetchedAndWriteToDatabase() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .flatMap(notification -> reactiveMongoRepository
                    .saveAndReturnNewObject(notification)
                    .doOnNext(log.info("I just saved something and returned an instance of NewObject!"))
                    .zipWith(Mono.just(notification)))
                .map(tuple->tuple.getT2())
                .filter(notification -> notification.getStatus().equals(PlanningNotification.Status.DATA_FETCHED))
                .doOnNext(notification -> log.info("received data: {} - saved ", notification));
    }
Run Code Online (Sandbox Code Playgroud)

发出新事件同样简单。只需调用发出方法:



notificationUsecase.emitNotification(eventId, EventNotification.Status.PING, payload);


Run Code Online (Sandbox Code Playgroud)


归档时间:

查看次数:

2438 次

最近记录:

4 年 前