当前位置: 首页 > 知识库问答 >
问题:

网络流量:内部事件总线和异步、松散耦合的事件监听器

支华池
2023-03-14

如何实现内部事件总线以在webflux spring堆栈中执行异步操作?

我想要一个服务来发出一个事件:

@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
  }
}

发布者服务不知道的另一个组件应该能够决定对该事件做出反应。

@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
  override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
    // do stuff
  }
}

在MVC应用程序中,我将使用ApplicationEventPublisher来发布事件(PublishEvent)和@EventListener@Async在处理程序上(onDeleteEntry)。

反应式堆栈中的等价物是什么?

我考虑的另一个选择是运行嵌入式消息服务,因为这应该意味着异步语义学。但是对于一个简单的场景来说,这感觉像是一个很大的开销。

我发现这些线

  • 用Reactor点火忘记

但他们没有回答这种情况,因为他们假设发布者知道侦听器。但我需要松耦合。

我还发现了这些Spring特刊

  • https://github.com/spring-projects/spring-framework/issues/21025

特别是看到这个评论promise建议:

Mono.fromRunnable(() -> context.publishEvent(...))

据我所知,我可以只使用@EventListener,因为我完全可以不传播反应性上下文。

但是,有人能解释一下线程边界的含义吗?这在Reactor栈中是否合法?

更新

通过测试,这样做感觉很好:

@Service
class FeedServiceImpl(
  val applicationEventPublisher: ApplicationEventPublisher,
) : FeedService {
  @EventListener
  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler started")
    runBlocking {
      // do stuff that takes some time
      delay(1000)
    }
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    applicationEventPublisher.publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }
}

请注意,handle不是挂起函数,因为EventListener必须有一个参数,协同例程在场景后面引入continuation参数。然后,处理程序启动一个新的阻塞协同路由作用域,该作用域很好,因为它位于不同的线程上,因为它是异步的。

输出为:

2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl  : ThreadId: 38
2021-05-13 12:15:20.755  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler started
2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl   : Publisher done
2021-05-13 12:15:21.758  INFO 20252 --- [         task-1] ...FeedServiceImpl   : ThreadId: 54
2021-05-13 12:15:21.759  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler done

更新2

不使用@Async的另一种方法是:

  @EventListener
//  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler start")
    log.info("Handler ThreadId: ${Thread.currentThread().id}")
    runBlocking {
      log.info("Handler block start")
      delay(1000)
      log.info("Handler block ThreadId: ${Thread.currentThread().id}")
      log.info("Handler block end")
    }
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    feedRepository.deleteById(entryId)
    Mono.fromRunnable<Unit> {
      applicationEventPublisher.publishEvent(
        FeedEntryDeletedEvent(
          timestamp = time.utcMillis(),
          entryId = entryId,
        )
      )
    }
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe()
    log.info("Publisher ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }

2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher ThreadId: 38
2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher done
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler start
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler ThreadId: 53
2021-05-13 13:06:54.505  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block start
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block ThreadId: 53
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block end
2021-05-13 13:06:55.540  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler done

然而,我仍然不理解负载下应用程序的含义,将反应式操作与执行运行阻塞{}的处理程序混合在一起感觉是错误的。

共有1个答案

薛淳
2023-03-14

Reactor提供水槽。您可以像使用事件总线一样使用它。请看下面的示例。

@Configuration
public class EventNotificationConfig {

    @Bean
    public Sinks.Many<EventNotification> eventNotifications() {
        return Sinks.many().replay().latest();
    }

}

您在@Configuration类中创建了Sink的Bean。这可用于发出新事件,并且可以将其转换为订阅者的Flux。

@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationUsecase {

    private final @NonNull Sinks.Many<EventNotification> eventNotifications;


    /**
     * Provide a flux with our notifications.
     *
     * @return a Flux
     */
    public Flux<EventNotification> getNotifications() {
        return eventNotifications.asFlux();
    }

    /**
     * Emit a new event to the sink.
     *
     * @param eventId
     * @param status
     * @param payload
     */
    public void emitNotification(final String eventId, final EventNotification.Status status, final Map<String, Object> payload) {
        eventNotifications.tryEmitNext(EventNotification.builder()
          .eventId(eventId)
          .status(status)
          .payload(payload).build());
    }

}

您最多可以在应用程序中保留一个接收器实例。订阅不同类型的事件可以通过各种订阅者可以应用于流量的过滤器来实现。


@Component
@RequiredArgsConstructor
@Slf4j
public class EventListener {

    private final @NonNull NotificationUsecase notificationUsecase;


    /**
     * Start listening to events as soon as class EventListener
     * has been constructed.
     *
     * Listening will continue until the Flux emits a 'completed'
     * signal.
     */
    @PostConstruct
    public void init() {

        this.listenToPings()
                .subscribe();
        this.listenToDataFetched()
                .subscribe();
    }


    public Flux<EventNotification> listenToPings() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.PING))
                .doOnNext(notification -> log.info("received PING: {}", notification));
    }

    public Flux<EventNotification> listenToDataFetched() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.DATA_FETCHED))
                .doOnNext(notification -> log.info("received data: {}", notification));
    }
}

    
    public Flux<EventNotification> listenToDataFetchedAndWriteToDatabase() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .flatMap(notification -> reactiveMongoRepository
                    .saveAndReturnNewObject(notification)
                    .doOnNext(log.info("I just saved something and returned an instance of NewObject!"))
                    .zipWith(Mono.just(notification)))
                .map(tuple->tuple.getT2())
                .filter(notification -> notification.getStatus().equals(PlanningNotification.Status.DATA_FETCHED))
                .doOnNext(notification -> log.info("received data: {} - saved ", notification));
    }

发出新事件同样简单。只需调用emit方法:



notificationUsecase.emitNotification(eventId, EventNotification.Status.PING, payload);


 类似资料:
  • Blade 中提供一个方法帮助开发者可以自定义的监听应用程序运行中的一些生命周期。比如 Session 的创建与销毁,应用启动结束后等。 支持的事件类型有如下: public enum EventType { SERVER_STARTING, // 服务准备启动 SERVER_STARTED, // 服务启动成功 SERVER_STOPPING, //

  • Nutz.Ioc 容器有三种事件: 对象被创建(create事件) 对象被从容器中取出(fetch事件) 对象被销毁(depose事件) 在这三种时刻,你如果想做一些特殊的操作,比如,当一个数据源被销毁时,你希望能够关闭所有的连接, 声明一下,你想监听什么事件,以及怎么监听。 注: 如果你的对象是 "singleton: false",那么容器创建了对象后就会立即忘记它的存在。因为鬼才知道 你打算

  • 全局事件 事件监听 注解监听 以imi/src/Listener/Init.php为例 <?php namespace Imi\Listener; use Imi\Event\EventParam; use Imi\Event\IEventListener; use Imi\Bean\Annotation\Listener; /** * @Listener(eventName="IMI.IN

  • Chrome DevTools命令行API提供了多种方式来观察和检查事件监听器。JavaScript在交互式页面中起着中心作用,并且浏览器为您提供了一些有用的工具来调试事件和事件处理程序。 TL;DR 使用monitorEvents()监听某一类型的事件。 使用unmonitorEvents()停止监听。 使用getEventListeners()获取DOM元素的监听器。 使用Event List

  • sTree触发容器上的各种事件。您可以查看所有事件的列表以了解要听的内容。 要获取有关事件的更多信息,请检查其data参数。 在大多数情况下,涉及节点的情况下,您将传递整个节点对象。如果在某处获取ID字符串并想要检查该节点,则只需使用 .get_node()。内部节点对象与用于加载的JSON格式非常相似,但是具有一些额外的属性,这可能很有用:children是节点的直接子节点的children_d

  • 应用事件监听器是实现一个或多个 Servlet 事件监听器接口的类。它们是在部署 Web 应用时,实例化并注册到 Web 容器中。它们由开发人员在WAR 包中提供。 Servlet 事件监听器支持在 ServletContext、HttpSession 和ServletRequest 状态改变时进行事件通知。Servlet 上下文监听器是用来管理应用的资源或 JVM 级别持有的状态。HTTP 会话