当前位置: 首页 > 知识库问答 >
问题:

反应式编程-Webflux Webfilter运行不正常

路奇
2023-03-14

我对反应式编程有点陌生,我正在尝试组装以下组件:使用Java、Springboot 2、Webflux和reactor core,我想处理需要额外身份验证的非常特定的请求。因此,我通过一系列步骤实现了一个Web过滤器:

  • 捕获请求的路径和方法。检查该组合是否存在,是否需要使用accessPointService进行特定身份验证。getAccessPointAuthorizationRequirement方法(返回带布尔值的Mono)
  • 因为我配置了CSRF和Spring Security性,所以我需要CSRF令牌和springsession凭据。我对凭据发出GET和POST请求
  • 然后使用凭据,我只需向一个服务(authcheck)发出POST请求,该服务可以执行一系列安全检查(该服务正常,从Postman和Angular可以正常工作)
  • 之后,我需要检索主体,将其转换为字符串,并对其进行检查。现在这种情况没有发生

过滤器

@Override
    public Mono<Void> filter(final ServerWebExchange serverWebExchange, final WebFilterChain webFilterChain) {

        //client for specific requests.
        WebClient webClient = WebClient.builder()
                .baseUrl("http://localhost:8080")
                .build();
        //get request for the CSRF cookie.
        WebClient.RequestHeadersSpec<?> getRequest = webClient.get()
                .uri("/login");
        //post request for the spring security session cookie.
        WebClient.RequestHeadersSpec<?> postRequest = webClient.post()
                .uri("/login")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
                .body(BodyInserters.fromFormData("username", "username")
                        .with("password", "password"));
                        //services that checks if the given request needs extra authentication
        return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), serverWebExchange.getRequest().getPath().toString())
                .log()
                //gets the csrf token from the GET request
                .flatMap(isRequired -> getRequest.exchangeToMono(response -> Mono.just(response.cookies().getFirst("XSRF-TOKEN").getValue())))
                //combines the previous token with the POST request SESSION cookie,
                //THEN secures the last request with both credentials
                .zipWith(postRequest.exchangeToMono(resp -> Mono.just(resp.cookies().getFirst("SESSION").getValue())),
                        AuthenticationFilter::secureAuthRequest)
                //gets the exchange from the request and converts the body into a String
                .flatMap(AuthenticationFilter::getRequestExchange)
                //code to validate if it's doing something. Not implemented yet because it never executes.
                .flatMap(s -> Mono.just(s.equals("")))
                .onErrorResume(e -> {
                    throw (CustomException) e;//breaks the execution
                })
                .then(webFilterChain.filter(serverWebExchange));//continues the execution
    }

调用了secureAuthRequest和getRequestExchange方法

//adds the springsession cookie and csrf cookie to the request
private static WebClient.RequestHeadersSpec<?> secureAuthRequest(String csrf, String spring) {

        WebClient webClient = WebClient.builder()
                .baseUrl("http://localhost:8080")
                .build();
        WebClient.RequestHeadersSpec<?> request = webClient.post()
                .uri("/authcheck")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
        request.header("X-XSRF-TOKEN", csrf);
        request.cookies( cookies -> cookies.add(  "XSRF-TOKEN", csrf) );
        request.header("Authorization", spring);
        return request;
    }

//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {

        return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
    }

但是,当请求被绑定为要进行身份验证时,日志如下所示:

2021-10-26 23:57:18.760  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-10-26 23:57:18.761  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | request(unbounded)
2021-10-26 23:57:18.761  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | onNext(true)
2021-10-26 23:57:18.762  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | onComplete()

据我所知,数据流从一个订阅和一个后请求开始(我认为从accessPointService.getAccessPointAuthorizationRequirement方法Mono值返回TRUE,如果我错了,请更正),但随后会显示“onComplete()”日志。我不知道onComplete()日志的确切含义,因为它是在执行getRequestExchange方法(调用该方法)之前显示的。单声道。只有(s.equals(“”)一段代码永远不会执行。

我读过很多关于“订阅之前什么都不会发生”的书,但我仍然不知道如果我从未明确订阅流,为什么会调用反应流,也不知道如何实现它,因为它只返回一次性的(我想我可以从内部抛出异常?)。此外,我听说在调用多个订阅者时要进行解耦,所以我尽量避免使用它们。

任何关于反应性编程、Reactor堆芯或特定流程以及如何改进的帮助,我们都将不胜感激。

干杯

共有1个答案

田晨
2023-03-14

因此,在进行了一些研究之后,感谢@Toerktumlare的评论,并弄清楚了发生了什么,以及我对此做了哪些更改/应用。

因此,对于“onComplete()”日志,它标志着数据生产者的结束。因此,为了查看操作的完整堆栈,我需要将每个生产者与自己的日志链接起来。例如:

Mono.just(Boolean.FALSE)
    .log()
    .flatMap(booleanVal -> Mono.just(booleanVal.toString()))
    .log()
    .subscribe(stringVal -> System.out.println("This is the boolean value " + stringVal));

这将为初始生产者和平面图操作生成跟踪。

现在,关于主要问题,问题出现在getRequestExchange方法中:

//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {

        return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
    }

问题隐藏在BodyTomino方法中。根据本网站https://medium.com/@Jeevjyotsinghchabda/dont-let-webclient-s-bodytomono-trick-you-645123b3e0a9,如果此请求的响应由于任何原因没有正文,则不会抛出任何错误,只需返回一个Mono。空()。因为流程没有为这样的生产商做好准备,所以就在这里结束了。

在我的例子中,问题是spring云安全。我在请求中提供了授权凭据,但没有提供关联的会话cookie。因此,请求返回了一个302(找到),没有正文。这就是问题所在(不是反应流本身)。

所以,在那之后,我修改了请求,@Toerkumlare的评论帮助我开发了一个工作解决方案:

//service that returns if certain resource needs authentication or not, or if it's not even configured
return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), FWKUtils.translateAccessPointPath(serverWebExchange.getRequest().getPath().pathWithinApplication().elements()))
                //if the response is a Mono Empty, then returns a not acceptable exception
                .switchIfEmpty(Mono.defer(() -> throwNotAcceptable(serverWebExchange)))
                //takes the boolean value to check if extra auth is needed.
                .flatMap(isRequired -> validateAuthenticationRequirement(isRequired))
                //gets the access token - the extra auth credential
                .flatMap(isRequired -> getHeaderToken(serverWebExchange))
                //from this access generates a WebClient to the specific authentication service - from a webClientProvider to not create too many WebClients.
                .flatMap(accessToken -> generateAuthenticationRequest(webClientProvider.getInstance(), accessToken))
                //gets the CRSF token credential and secures the request (adds it to the header and the cookies)
                .zipWith(getCredential(webClientProvider.getInstance(), "csrf"), (securedRequest, csrfToken) -> secureAuthenticationRequest(securedRequest, csrfToken, "X-XSRF-TOKEN", "XSRF-TOKEN"))
                //gets the SESSION (spring cloud security) token credential and secures the request (adds it to the header and the cookies)
                .zipWith(getCredential(webClientProvider.getInstance(), "spring-cloud"), (securedRequest, sessionToken) -> secureAuthenticationRequest(securedRequest, sessionToken, "Authorization", "SESSION"))
                //does the request and gets the response
                .map(requestBodySpecs -> requestBodySpecs.retrieve())
                //from the response, maps it to a specific DTO. The single() clause is to validate that a body is present.
                .flatMap(clientResponse -> clientResponse.bodyToMono(SecurityCredentialResponseDTO.class).single())
                //checks the authentication and throws a Unauthorizedstatus if its not valid.
                .flatMap(responseDTO -> checkTokenAuthentication(serverWebExchange, responseDTO))
                //if an error is present, then throws it 
                .onErrorResume(e -> {
                    if (e instanceof FWKException.GenericException) {
                        throw (FWKException.GenericException) e;
                    }
                    throw (RuntimeException) e;
                })
                //finally, continues the execution if no exception was thrown.
                .then(webFilterChain.filter(serverWebExchange));

我在这个解决方案中还实现了更多功能(存储CSRF和spring云凭据以避免不必要的调用)。

 类似资料:
  • 我开始更多地接受反应式编程,并试图将其应用于我的典型业务问题。我经常使用的一种模式是数据库驱动类。我有一些已定义的单元类,如ActionProfile,其实例由ActionProfileManager管理,它从数据库表中创建实例并将其存储在映射中 然而,如果我想让它更具反应性,那么创建地图会打破单子。我可以做的一种方法是将映射本身设置为可观察的,并返回一个monad来为客户端查找特定的键。然而,中

  • 反应式编程是一种编程范式,用于处理数据流和变化的传播。 这意味着当一个组件发出数据流时,更改将通过响应式编程库传播到其他组件。 变化的传播将持续到最终接收器。 事件驱动和反应式编程之间的区别在于事件驱动的编程围绕事件而反应式编程围绕数据。 ReactiveX或RX用于反应式编程 ReactiveX或Raective Extension是最着名的反应式编程实现。 ReactiveX的工作取决于以下两

  • null 在这两种实现中,REST控制器都直接从存储库中获取数据,并将其作为列表RESP返回。作为通量。不执行进一步的应用程序逻辑。 我们对100个调用服务的用户进行了一个小型负载/性能测试,我们发现非反应实现的性能远远好于反应实现。 事实上,非反应实现不仅有更好的HTTP吞吐量,而且更有趣的是,它比反应实现消耗更少的CPU和线程!这与预期尤其相反,因为我们预期反应版本只需少量线程即可扩展,如ht

  • 问题内容: 在C语言中,如何以编程方式找出Linux / Ubuntu上是否已在运行某个进程,以避免该进程两次启动?我正在寻找类似于pidof的东西。 问题答案: 您可以进入这些条目并在文件中检查过程,也可以在链接上执行操作(以下使用第一种方法)。

  •   函数式反应型编程是两个声明式编程的子范例(函数式+反应式)的组合。这里我们先来理解反应式编程,因为它非常简单。   反应式编程在表处理方面十分强悍。假设我们有一个表格A:她是用来纪录其他两个表格(表格B、表格C)的和。当表格B或C当中任意一个值发现变化时,这些变化都会通过表实时改变表格A的值。总之,我们定义好了A是B和C的和,不管发生了什么,A会一直响应B或C的变化,永远都是B与C的和。   

  • 我的iMac上有一个完整的react本机环境,当我运行react本机运行ios时,项目完全运行,但当我使用android执行此操作时,它不工作,metro bundler窗口打开但不工作,并且不显示任何消息。 我必须做什么? 运行“react native run android”后我的终端 运行Android系统后,我的终端的打印屏幕,以及在它下面的metro捆绑包