当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Gateway:后过滤Web客户端请求

吉栋
2023-03-14

我们使用Spring Cloud Gateway将请求路由到多个底层服务。对这些底层服务的调用将是连续的,并可能相互传递(来自一个服务的响应在下一个服务的请求中使用)。当我们需要在主请求之前顺序地发出这些请求时,我们有一个可行的解决方案,但在主请求之后,我们在将一个代理请求的响应输入到下一个代理请求的请求中时遇到了问题。

我们计划将响应从一个请求发送到下一个请求的方式是使用GatewayFilter中的WebClient发出请求,并将响应字符串存储在Exchange的属性存储区中。然后,在下一个代理请求期间,我们提供一个属性名,可选地从中提取请求正文。这在使用“预”过滤器时很好,因为第一个代理请求在第二个请求构建和执行之前就已经构建、执行和缓存了响应,所以属性链按预期工作。当使用“POST”过滤器时,问题就来了。在post代理中,web客户端请求都是在后续请求完成之前构建的。因此,属性存储区从来没有来自上一个请求的响应,这意味着下一个请求不能按预期工作,因为它没有有效的请求体。

我的理解是调用chain.filter(exchange).then(mono.fromrunnable{...})将导致.then逻辑仅在前面的筛选器完全完成后才执行。事实似乎并非如此。在其他筛选器类型中,如日志记录、响应操作等,post筛选器以正确的顺序执行,但在创建WebClient时,它们似乎不是这样。

有人对如何实现这种期望的行为有什么想法吗?

class PreProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PreProxyGatewayFilterFactory.Params>(Params::class.java) {
    override fun apply(params: Params): GatewayFilter {
        return OrderedGatewayFilter(
            { exchange, chain ->
                ServerWebExchangeUtils.cacheRequestBody(exchange){
                    val cachedExchange = exchange.mutate().request(it).build()
                    executeRequest(cachedExchange, params)
                        .map { response ->
                            val body = response.body.toString()
                            cacheResponse(
                                response.body.toString(),
                                params.cachedResponseBodyAttributeName,
                                cachedExchange
                            )
                        }
                        .flatMap(chain::filter)
                }
            }, params.order)
    }

    private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
        if(!attributeName.isNullOrBlank()){
            exchange.attributes[attributeName] = response
        }
        return exchange
    }

    private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
        val request = when(exchange.request.method){
            HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
            HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
            else -> throw Exception("Invalid request method passed in to the proxy filter")
        }
        return request.headers { headers ->
            headers.addAll(exchange.request.headers)
            headers.remove(CONTENT_LENGTH)
        }
        .exchange()
        .flatMap{ response ->
            response.toEntity(String::class.java)
        }
    }

    private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
        val cachedBody = attributeName?.let { attrName ->
            exchange.getAttributeOrDefault<String>(attrName, "null")
        } ?: "null"
        return if(cachedBody != "null"){
            BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
        } else {
            BodyInserters.fromDataBuffers(exchange.request.body)
        }
    }

    data class Params(
            val proxyPath: String = "",
            val cachedRequestBodyAttributeName: String? = null,
            val cachedResponseBodyAttributeName: String? = null,
            val order: Int = 0
    )
}
class PostProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PostProxyGatewayFilterFactory.Params>(Params::class.java) {
    override fun apply(params: Params): GatewayFilter {
        return OrderedGatewayFilter(
            { exchange, chain ->
                ServerWebExchangeUtils.cacheRequestBody(exchange){
                    val cachedExchange = exchange.mutate().request(it).build()

                    //Currently using a cached body does not work in post proxy
                    chain.filter(cachedExchange).then( Mono.fromRunnable{
                        executeRequest(cachedExchange, params)
                            .map { response ->
                                cacheResponse(
                                    response.body.toString(),
                                    params.cachedResponseBodyAttributeName,
                                    cachedExchange
                                )
                            }
                            .flatMap {
                                Mono.empty<Void>()
                            }
                    })
                }
            }, params.order)
    }

    private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
        if(!attributeName.isNullOrBlank()){
            exchange.attributes[attributeName] = response
        }
        return exchange
    }

    private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
        val request = when(exchange.request.method){
            HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
            HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
            else -> throw Exception("Invalid request method passed in to the proxy filter")
        }
        return request.headers { headers ->
            headers.addAll(exchange.request.headers)
            headers.remove(CONTENT_LENGTH)
        }
        .exchange()
        .flatMap{ response ->
            response.toEntity(String::class.java)
        }
    }

    private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
        val cachedBody = attributeName?.let { attrName ->
            exchange.getAttributeOrDefault<String>(attrName, "null")
        } ?: "null"
        return if(cachedBody != "null"){
            BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
        } else {
            BodyInserters.fromDataBuffers(exchange.request.body)
        }
    }


    data class Params(
            val proxyPath: String = "",
            val cachedRequestBodyAttributeName: String? = null,
            val cachedResponseBodyAttributeName: String? = null,
            val order: Int = 0
    )
}

共有1个答案

邹山
2023-03-14

最终能够为post filter代理找到一个有效的解决方案,从属性中提取它的请求体。这是一个相对简单的修复,我只是找不到答案。我只需要使用chain.filter(exchange).然后(mono.fromrunnable{...execute proxy request...}),而不是使用chain.filter(exchange).然后(mono.defer{...execute proxy request...})

 类似资料:
  • 客户端的HTTP/HTTPS请求。 进程:主进程​ ClientRequest是由EventEmitter来实现Writable Stream​ new ClientRequest(options) 作用:发起新的HTTP/HTTPS请求 options(Object | String) - options是String时即请求URL。 options 是Object时则按以下属性请求: meth

  • 我正在使用Spring WebClient检索福布斯全球2000强。它工作正常,但现在他们在试图访问https://www . Forbes . com/Forbes API/org/global 2000/2020/position/true . JSON上的JSON数据时添加了一个cookie同意模式。限制=2000 从技术上讲,这是我可以从Chrome看到的网络流: > 获取 https:/

  • 过滤请求和响应,可以提供有用的低层次的概念集中在某一个独立的方面或域,将应用层和发送请求、执行响应相解耦。过滤器可以读取/修改请求的URI,头文件和实体或读取/修改响应状态,头文件和实体。 Jersey 包含以下有用的客户端过滤器(和功能注册的过滤器),你可以在你的应用程序中使用: CsrfProtectionFilter:跨站请求伪造保护过滤器(加 X-Requested-By 到每个状态改变的

  • httplib 库主要用来模拟客户端发送 HTTP 请求,类似于 Curl 工具,支持 JQuery 类似的链式操作。使用起来相当的方便;通过如下方式进行安装: go get github.com/astaxie/beego/httplib 如何使用 首先导入包 import ( "github.com/astaxie/beego/httplib" ) 然后初始化请求方法,返回对象 r

  • 当浏览器请求一个网页时,它会向网络服务器发送一系列不能被直接读取的信息,因为这些信息是作为HTTP信息头的一部分来传送的。您可以查阅HTTP协议来获得更多的信息。 下表列出了浏览器端信息头的一些重要内容,在以后的网络编程中将会经常见到这些信息: 信息 描述 Accept 指定浏览器或其他客户端可以处理的MIME类型。它的值通常为 image/png 或 image/jpeg Accept-Char