当前位置: 首页 > 知识库问答 >
问题:

Spring Webflux(Mono/Flux),AOP在拦截时触发REST调用并使用Mono/Flux

古扬
2023-03-14

我写了一个@Aspect来拦截以Mono/Flux返回值的被动方法。使用@AfterReturning advice,我试图通过调用webservice发出APNS通知。

不幸的是,processNotification Mono服务在没有执行调用链的情况下立即返回onComplete信号。下面是我的示例程序。

@Aspect
@Component
@Slf4j
public class NotifyAspect{
    private final NotificationServiceHelper notificationServiceHelper;

    @Autowired
    public NotifyAspect(NotificationServiceHelper notificationServiceHelper) {
        this.notificationServiceHelper = notificationServiceHelper;
    }

    @AfterReturning(pointcut = "@annotation(com.cupid9.api.common.annotations.Notify)", returning = "returnValue")
    public void generateNotification(JoinPoint joinPoint, Object returnValue) throws Throwable {
        log.info("AfterReturning Advice - Intercepting Method : {}", joinPoint.getSignature().getName());

        //Get Intercepted method details.
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        //Get the Notification Details.
        Notify myNotify = method.getAnnotation(Notify.class);
        if (Mono.class.isAssignableFrom(returnValue.getClass())) {
            Mono<Object> result = (Mono<Object>) returnValue;
            result.doOnSubscribe(o -> {
                log.debug("On Subscription...");
                notificationServiceHelper.processNotification(myNotify.notificationType())
                .doOnError(throwable -> {
                    log.error("Exception in notification processor",throwable);
                });
            });
        }
    }

}
@Slf4j
@Service
public class NotificationServiceHelper {
    private ReactiveUserProfileRepository userProfileRepository;

    @Value("${services.notification.url}")
    private String notificationServiceUrl;

    private RestWebClient restWebClient;

    @Autowired
    public NotificationServiceHelper(RestWebClient restWebClient,
                                     ReactiveUserProfileRepository reactiveUserProfileRepository) {
        this.restWebClient = restWebClient;
        this.userProfileRepository = reactiveUserProfileRepository;
    }

    public Flux<Notification> processNotification(NotificationSchema.NotificationType notificationType) {
        /*Get user profile details*/
        return SessionHelper.getProfileId()
                .switchIfEmpty( Mono.error(new BadRequest("Invalid Account 1!")))
                .flatMap(profileId ->
                        Mono.zip(userProfileRepository.findByIdAndStatus(profileId, Status.Active), SessionHelper.getJwtToken()))
                .switchIfEmpty( Mono.error(new BadRequest("Invalid Account 2!")))
                .flatMapMany(tuple2 ->{
                    //Get user details and make sure there are some valid devices associated.
                    var userProfileSchema = tuple2.getT1();
                    log.info("Processing Notifications for User Profile : {}", userProfileSchema.getId());
                    if (Objects.isNull(userProfileSchema.getDevices()) || (userProfileSchema.getDevices().size() < 1)) {
                        return Flux.error(new InternalServerError("No Devices associate with this user. Can not send notifications."));
                    }

                    //Build Notification message from the Notification Type
                    var notificationsMap = new LinkedHashSet<Notification>();
                    userProfileSchema.getDevices().forEach(device -> {
                        var notificationPayload = Notification.builder()
                                .notificationType(notificationType)
                                .receiverDevice(device)
                                .receiverProfileRef(userProfileSchema.getId())
                                .build();
                        notificationsMap.add(notificationPayload);
                    });

                    //Get session token for authorization
                    var jwtToken = tuple2.getT2();

                    //Build the URI needed to make the rest call.
                    var uri = UriComponentsBuilder.fromUriString(notificationServiceUrl).build().toUri();
                    log.info("URI built String : {}", uri.toString());

                    //Build the Headers needed to make the rest call.
                    var headers = new HttpHeaders();
                    headers.add(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE);
                    headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
                    headers.add(HttpHeaders.AUTHORIZATION, jwtToken);

                    var publishers = new ArrayList<Mono<ClientResponse>>();
                    notificationsMap.forEach(notification -> {
                        publishers.add(restWebClient.post(uri, headers, notification));
                    });
                    return Flux.merge(publishers).flatMap(clientResponse -> {
                        var httpStatus = clientResponse.statusCode();
                        log.info("NotificationService HTTP status code : {}", httpStatus.value());
                        if (httpStatus.is2xxSuccessful()) {
                            log.info("Successfully received response from Notification Service...");
                            return clientResponse.bodyToMono(Notification.class);
                        } else {
                            // return Flux.empty();
                            return clientResponse.bodyToMono(Error.class)
                                    .flatMap(error -> {
                                        log.error("Error calling Notification Service :{}", httpStatus.getReasonPhrase());
                                        if (httpStatus.value() == 400) {
                                            return Mono.error(new BadRequest(error.getMessage()));
                                        }
                                        return Mono.error(new InternalServerError(String.format("Error calling Notification Service : %s", error.getMessage())));
                                    });
                        }
                    });
                }).doOnError(throwable -> {
                    throw new InternalServerError(throwable.getMessage(), throwable);
                });
    }

}

我们如何在不等待侦听的情况下异步触发此调用。。目前,processNotification总是在不执行的情况下返回onComplete信号。链未按预期执行

共有1个答案

邓德本
2023-03-14
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Log {

     public String title() default "";

}


@SuppressWarnings({"unchecked"})
@Around("@annotation(operlog)")
public Mono<Result> doAround(ProceedingJoinPoint joinPoint, Log operlog) {
    Mono<Result> mono;

    try {
        mono = (Mono<Result>) joinPoint.proceed();
    } catch (Throwable throwable) {
        throw new RuntimeException(throwable);
    }

    return mono.doOnNext(result -> {
                //doSomething(result);
            };

}
 类似资料:
  • 我的代码是这样构造的- 我正在努力实现这一点: 方法1()返回地址时,我需要使用它并调用方法2()来更新MongoDB文档中的地址。也没有抛出异常。但是我没有看到任何日志在方法2() 代码: 虽然调用了method2(),但MongoDB中的文档更新没有发生。

  • 我有这个场景。我有一个分页的API,它给我过去12个月的数据。API的响应是这样的: 现在我必须收集所有的数据,然后计算所有的总和,并返回为

  • 我确实有一个方法,它成功地获取记录,我正在迭代它,以获取基于工人id的Mono对象,成功地返回对象,但我不能创建最终的通量,它应该包括的WorkerDTO(正常Spring Boot应用程序的WorkerDTO列表),但它返回空对象i. e

  • 我正在用Spring web-flux和Reactor一起使用,对我来说,不清楚RestController方法何时返回

  • 我在SpringBoot中有一个方法,可以进行多个mono调用,返回一个类型字符串。此方法最终将返回所有mono调用的结果,这些调用随后将转换为POJO对象。我试过单声道。但是它只接受8个元组。有更好的解决办法吗? eg:

  • 我想从Flux/Mono中获取对象。我使用 我会这样做: 我有错误: 为什么?有什么不同的方法来获取对象? 在反应式编程中,如何做到:在RequestBody中,您有UserDto。 如果不创建用户,请检查数据库中是否存在电子邮件。