当前位置: 首页 > 知识库问答 >
问题:

WebFlux应用程序中的WebFilter

阎咏思
2023-03-14
@Component
public class TraceIdFilter implements WebFilter {

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    return chain.filter(exchange).subscriberContext((Context context) ->
        context.put(AuditContext.class, getAuditContext(exchange.getRequest().getHeaders()))
    );
}
@GetMapping(value = "/some_mapping")
public Mono<ResponseEntity<WrappedResponse>> getResource(@PathVariable("resourceId") String id) {
    Mono.subscriberContext().flatMap(context -> {
        AuditContext auditContext = context.get(AuditContext.class);
        ...
    });

我遇到的问题是filter方法永远不会执行,并且上下文也没有设置。我已经确认Webfilter是在启动时加载的。还有什么需要让过滤器工作吗?

共有1个答案

戴瑞
2023-03-14

我有一大堆问题弄清楚了,所以希望它能帮助别人。我的用例是验证请求上的签名。这要求我解析put/post的请求正文。我看到的另一个主要用例是日志记录,所以下面的内容也会有所帮助。

java

@Component
public class MiddlewareAuthenticator implements WebFilter { 

    @Autowired private RequestValidationService requestValidationService;

@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain chain) {
  return HEALTH_ENDPOINTS
      .matches(serverWebExchange)
      .flatMap(
          matches -> {
            if (matches.isMatch()) {
              return chain.filter(serverWebExchange);
            } else {
              return requestValidationService
                  .validate(serverWebExchange, 
                       new BiPredicate<ServerWebExchange, String> { 
                         @Override
                         public boolean test(ServerWebExchange e, String body) {
                             /** application logic can go here. few points:
                              1. I used a BiPredicate because I just need a true or false if the request should be passed to the controller. 
                              2. If you want todo other mutations you could swap the predicate to a normal function and return a mutated ServerWebExchange. 
                              3. I pass body separately here to ensure safety of accessing the request body and not having to rewrap the ServerWebExchange. A side affect of this though is any mutations to the String body do not affect downstream.
                              **/
                              return true;
                            }

                      })
                 .flatMap((ServerWebExchange r) -> chain.filter(r));
            }});
}

RequestValidationService.java

@Service
public class RequestValidationService {
private DataBuffer stringBuffer(String value) {
  byte[] bytes = value.getBytes(StandardCharsets.UTF_8);

  NettyDataBufferFactory nettyDataBufferFactory =
      new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
  DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
  buffer.write(bytes);
  return buffer;
}

private String bodyToString(InputStream bodyBytes) {
  byte[] currArr = null;
  try {
    currArr = bodyBytes.readAllBytes();
    bodyBytes.read(currArr);
  } catch (IOException ioe) {
    throw new RuntimeException("could not parse body");
  }

  if (currArr.length == 0) {
    return null;
  }

  return new String(currArr, StandardCharsets.UTF_8);
}

private ServerHttpRequestDecorator requestWrapper(ServerHttpRequest request, String bodyStr) {
  URI uri = request.getURI();
  ServerHttpRequest newRequest = request.mutate().uri(uri).build();
  final DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
  Flux<DataBuffer> newBodyFlux = Flux.just(bodyDataBuffer);
  ServerHttpRequestDecorator requestDecorator =
      new ServerHttpRequestDecorator(newRequest) {
        @Override
        public Flux<DataBuffer> getBody() {
          return newBodyFlux;
        }
      };

  return requestDecorator;
}

private InputStream newInputStream() {
  return new InputStream() {
    public int read() {
      return -1;
    }
  };
}

private InputStream processRequestBody(InputStream s, DataBuffer d) {
  SequenceInputStream seq = new SequenceInputStream(s, d.asInputStream());
  return seq;
}

private Mono<ServerWebExchange> processInputStream(
    InputStream aggregatedBodyBytes,
    ServerWebExchange exchange,
    BiPredicate<ServerHttpRequest, String> predicate) {

  ServerHttpRequest request = exchange.getRequest();
  HttpHeaders headers = request.getHeaders();

  String bodyStr = bodyToString(aggregatedBodyBytes);

  ServerWebExchange mutatedExchange = exchange;

  // if the body exists on the request we need to mutate the ServerWebExchange to not
  // reparse the body because DataBuffers can only be read once;
  if (bodyStr != null) {
    mutatedExchange = exchange.mutate().request(requestWrapper(request, bodyStr)).build();
  }

  ServerHttpRequest mutatedRequest = mutatedExchange.getRequest();

  if (predicate.test(mutatedRequest, bodyStr)) {
    return Mono.just(mutatedExchange);
  }

  return Mono.error(new RuntimeException("invalid signature"));
}

/*
 * Because the DataBuffer is in a Flux we must reduce it to a Mono type via Flux.reduce
 * This covers large payloads or requests bodies that get sent in multiple byte chunks
 * and need to be concatentated.
 *
 * 1. The reduce is initialized with a newInputStream
 * 2. processRequestBody is called on each step of the Flux where a step is a body byte
 *    chunk. The method processRequestBody casts the Inbound DataBuffer to a InputStream
 *    and concats the new InputStream with the existing one
 * 3. Once the Flux is complete flatMap is executed with the resulting InputStream which is
 *    passed with the ServerWebExchange to processInputStream which will do the request validation
 */
public Mono<ServerWebExchange> validate(
    ServerWebExchange exchange, BiPredicate<ServerHttpRequest, String> p) {
  Flux<DataBuffer> body = exchange.getRequest().getBody();

  return body.reduce(newInputStream(), this::processRequestBody)
      .flatMap((InputStream b) -> processInputStream(b, exchange, p));
}

}

双谓词文档:https://docs.oracle.com/javase/8/docs/api/java/util/function/BiPredicate.html

 类似资料:
  • 上次我在考虑如何在我们的应用程序中正确使用记录器。例如,我有一个控制器,它返回一个用户流,但在日志中,我看到“Fetch users”日志是由另一个线程记录的,而不是处理管道上的线程,但这是一个好的方法吗? 在这种情况下,使用了两个线程,从我的角度来看,这不是一个好的选择,但我找不到在反应应用程序中使用记录器的好做法。我认为下面的方法更好,因为分配内存是来自处理线程,而不是来自spring web

  • 我使用Spring Boot 2.1. x与网络流量和安全性。我定义了一些类型的bean,它们被添加到MatcherSecurityWebFilterChain中。问题是,因为它们被定义为bean,所以它们也被添加在过滤链的末端,所以它们被执行两次。 对于Servlet应用程序,我们可以使用来避免这种情况: 对于反应性应用程序,什么是等效的?

  • 我有一个Spring reactive示例应用程序,它是从Spring Webflux文档中提供的一个示例修改而来的。该应用程序的分支以传统方式使用Spring Boot,并带有嵌入式应用服务器(Netty)。它工作得很好。 在Liberty分支中,我试图将应用程序构建为WAR,并将其部署到Websphere Liberty Profile。除了对构建过程的更改之外,最重要的代码更改是让我的(此处

  • 我正在运行一个spring boot WebFlux应用程序,通常该应用程序运行在Netty嵌入式服务器之上。相反,我正在运行一个tomcat实例,我试图从我的pom中排除tomcat,但仍然遇到同样的问题。 所以我想通过运行Netty而不是Tomcat来解决这个问题。 这是我的pom依赖项:

  • 关于如何构建CPU指标的可视化和洞察力的小问题。 我有一个Spring Boot Webflux应用程序,没什么特别的。我带来了执行器,千分尺和普罗米修斯依赖。 谢谢你。

  • Spring security 5.1.0。Rc1支持webflux中的OAuth2资源服务器https://spring.io/blog/2018/08/21/spring-security-5-1-0-rc1-released . 这里给出的例子讲述了基于JWT格式的OAuth2。我如何配置基于oAuth2的资源服务器并指定令牌解码uri。 在SpringMVC中,我可以使用@EnableRe