当前位置: 首页 > 知识库问答 >
问题:

Spring Gateway AsyncPredicate不使用reactor和flux

元俊雅
2023-03-14

我们为Spring网关编写了一个自定义谓词工厂来路由请求。我们正在解析XML请求的主体,然后根据主体中存在的特定方法派生路由。在此过程中,我们编写了以下代码来创建ServerRquest。

@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
        return exchange -> {
            Class<String> inClass = String.class;

            Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);

            if (cachedBody != null) {
                try {
                    boolean test = config.pattern.matcher((String) cachedBody).matches();
                    exchange.getAttributes().put(TEST_ATTRIBUTE, test);
                    return Mono.just(test);
                } catch (ClassCastException e) {
                    LOG.error("Predicate test failed because String.class does not match the cached body object", e);
                }
                return Mono.just(false);
            } else {

                return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                    DataBufferUtils.retain(dataBuffer);

                    Flux<DataBuffer> cachedFlux = Flux
                            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                            .bodyToMono(inClass).doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
                            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                });

            }
        };
    }

使用较旧版本的Spring-Boot-家长(2.1.7. RELEASE)和sping-Cloud依赖项(Greenwich. RELEASE)完美地运行此解决方案。但使用最新版本的Spring-Boot-Pak(2.3.1. RELEASE)和spring-cloud依赖项(Hoxton. SR6)我得到以下异常。网关应用程序正常启动,没有任何错误。

Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
        at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter$0(RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
        ... 84 more

有没有其他人也有同样的问题,知道如何解决这个问题?

共有1个答案

壤驷华美
2023-03-14

问题是,那些apis的格林威治版本是beta。现在CACHED_REQUEST_BODY_ATTR中预期的对象必须是PooledDataBuffer。所以我现在相应地更改了我的代码。现在看起来如下所示:

return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                    DataBufferUtils.retain(dataBuffer);

                    Flux<DataBuffer> cachedFlux = Flux
                            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                    PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
                    
                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                            .bodyToMono(inClass).doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
                            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                });

在更新类之后,它现在正在按预期工作。

 类似资料:
  • 之前提到Vert.x API是事件驱动 - 当他们都可用时,Vert.x传递事件给处理程序。 在大多数情况下Vertx要求使用一种称为event loop线程的处理程序。 如无有 Vert.x 或您的应用程序块中,event loop可以欢快地运行将事件传递给不同的处理程序提供事件陆续到达。 因为没有阻塞,event loop可以在短时间内提供大量的事件。例如一个单一的event loop可以非常

  • 我终于学会了用Reactor进行函数式编程。所以我是新手。 我要做的第一件事是使用WebClient调用外部API。这个调用需要是递归的,因为响应提供了调用参数的下一个值,我需要在下一个调用中使用它,直到满足微不足道的情况。 下面是我的想法: 似乎我需要把我的想法调整到这种编程风格,所以请给我一些例子 谢谢

  • 我想从第三方资源检索所有页面。为此,我写了这个: 但它不能正常工作被多次调用,在检索第一个响应之前,它会使用不同的页面执行多个请求。 如果我改成这个: 效果很好。 那么,我如何使用实现这一点呢?

  • Reactor 是一个基于 JVM 之上的异步应用基础库。为 Java 、Groovy 和其他 JVM 语言提供了构建基于事件和数据驱动应用的抽象库。Reactor 性能相当高,在最新的硬件平台上,使用无堵塞分发器每秒钟可处理 1500 万事件。 开源中国组织翻译的 Reactor 中文文档:http://projectreactor.mydoc.io 示例代码: // This helper m

  • 我已经在每个spring boot项目中将spring boot依赖项设置为。这是正确的吗?或者我应该把它移到我父母的POM里? 我遇到了1个问题。当我从最上面的文件夹运行mvn包时,它并没有重新打包Spring Bootjar。2.我的spring boot项目列出了其他spring boot依赖项。使用spring boot starter作为父级,它们不需要版本标记。现在他们有了。我在父po

  • 我使用请求一组URL。大多数URL属于相同的主机。似乎会为每个URL创建一个全新的TCP连接,即使已经为上一个URL建立了到主机的连接。当数百个连接同时建立时,一些服务器会丢弃新的连接或开始缓慢响应。 代码示例: 在日志中,我看到同一个远程主机的本地端口不同,活动和非活动连接的总和远远高于不同主机的数量。这就是为什么我认为没有重用已经建立的连接。 是否可以使用HTTP客户端通过与主机的同一TCP连