当前位置: 首页 > 知识库问答 >
问题:

RSocket和Spring不能处理多个请求

潘弘博
2023-03-14

我和Spring Boot一起玩RSocket。我想做一个简单的请求-响应示例。作为示例,我从这个链接中获取代码:

当我运行示例代码不做更改时,我会在异常请求期间得到错误。这个错误不是这个问题的重点,但我只是想展示由Baeldung对原始源代码的更改。

[reactor-tcp-nio-1]org.springframework.core.log.compositeLog:[5927A44D-9]500 HTTP GET“/current/pko”io.rsocket.exceptions.applicationerRoreXception:在io.rsocket.exceptions.exceptions.from(exceptions.java:76)没有destination“的处理程序:reactor.core.publisher.fluxonassemblyException:在以下站点观察到错误:_checkpoint handler”[ExceptionHandlingWebHandler]堆栈跟踪:at io.rsocket.exceptions.exceptions.from(Exceptions.java:76)at io.rsocket.core.rsocketRequester.handleframe(rsocketRequester.java:706)at io.rsocketRequester.handleincomingFrames(rsocketRequester.java:640)at reactor.core.publisher.lambdaSubscriber.onnext(Er.FLuxgroupby$unicastgroupdflux.drainregular(fluxgroupby.java:554)在reactor.core.publisher.fluxgroupby$unicastgroupby.java:630)在reactor.core.publisher.fluxgroupby$unicastgroupby.drain(fluxgroupby.java:630)在reactor.core.publisher.fluxgroupby$unicastgroupby.java:670)在mapconditionalsubscriber.onnext(fluxmap.java:213)在reactor.core.publisher.fluxmap$mapconditionalsubscriber.onnext(fluxmap.java:213)在reactor.netty.channel.fluxreceive.drainreceive(fluxreceive.java:213)在reactor.netty.channel.fluxreceive.drainreceive(fluxreceive.java:260)在InvokeChannelRead(AbstractChannelHandlerContext.java:379)at io.netty.channel.abstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)在io.netty.channel.abstractChannelContext.fireChannelRead(AbstractChannelHandlerContext.java:365)在io.netty.channel.abstractChannelContext.fireChannelRead(AbstractChannelHandlerContext.java:357)在handlerContext.java:379)在io.netty.channel.abstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)在io.netty.channel.abstractChannelContext.java:365),在io.netty.channel.abstractChannelContext.fireChannelRead(AbstractChannelHandlerContext.java:357)在埃尚内尔里德(AbstractChannelHandlerContext.java:365)在io.netty.channel.defaultChannelPipeline.firechannelRead(DefaultChannelPipeline.java:919)在io.netty.channel.nio.abstractNiobyTechannel$niobyTeUnsafe.Read(AbstractNiobyTechannel.java:163)在io.netty.channel.nio.nioEventLoop.processSelectedKey(NioEventLoop.java:714)在在io.netty.channel.nio.nioEventLoop.run(nioEventLoop.java:493),在io.netty.util.concurrent.singleThreadEventExecutor$4.run(singleThreadEventExecutor.java:989),在io.netty.util.internal.ThreadExecutorMap$2.run(threadExecutorMap.java:74),在io.netty.util.concurrent.fastThreadLocalRunnable.java:30,在

所以我将客户端代码从

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        return RSocketFactory.connect()
                             .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
                             .frameDecoder(PayloadDecoder.ZERO_COPY)
                             .transport(TcpClientTransport.create(7000))
                             .start()
                             .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}

@Configuration
public class ClientConfiguration {
    

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketStrategies(rSocketStrategies)
                .connectTcp("localhost", 7000)
                .block();
    }
}

这个小改动帮助,不发生异常。另一个问题是,来自客户端(请求者)的请求由服务器(响应者)一个接一个地处理。我创建了SOAPUI REST项目,并在2个线程中运行GET请求。它看起来像服务器使用单线程。这不是我期望达到的目标。

@Controller
public class MarketDataRSocketController {

    Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);

    private final MarketDataRepository marketDataRepository;

    public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
        this.marketDataRepository = marketDataRepository;
    }

    @MessageMapping("currentMarketData")
    public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
        logger.info("Getting data for: "+marketDataRequest);
        Mono<MarketData> result = marketDataRepository.getOne(marketDataRequest.getStock());
        logger.info("Controller thread move forward: "+marketDataRequest);
        return result;
    }

    @MessageExceptionHandler
    public Mono<MarketData> handleException(Exception e) {
        return Mono.just(MarketData.fromException(e));
    }
}
@Component
public class MarketDataRepository {
    Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
    private static final int BOUND = 100;
    private Random random = new Random();

    public Mono<MarketData> getOne(String stock) {
        //return return Mono.just(getMarketDataResponse(stock)); original code from baeldung.
        return Mono.just(stock).map(s -> getMarketDataResponse(s));
    }

    private MarketData getMarketDataResponse(String stock) {
        logger.info("Repository thread go speel ZzzZZ");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        logger.info("Repository thread move forward");
        return new MarketData(stock, random.nextInt(BOUND));
    }
}

@Configuration
public class ClientConfiguration {


    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketStrategies(rSocketStrategies)
                .connectTcp("localhost", 7000)
                .block();
    }
}
@RestController
public class MarketDataRestController {
    Logger logger = LoggerFactory.getLogger(MarketDataRestController.class);

    private final Random random = new Random();
    private final RSocketRequester rSocketRequester;

    public MarketDataRestController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping(value = "/current/{stock}")
    public Publisher<MarketData> current(@PathVariable("stock") String stock) {
        logger.info("Get REST call for stock : "+stock);
        return rSocketRequester.route("currentMarketData")
                               .data(new MarketDataRequest(stock))
                               .retrieveMono(MarketData.class);
    }
}

在客户端日志中,我得到:

2021-09-01 11:30:14,614信息[reactor-http-nio-2]com.baeldung.spring.rsocket.client.marketDataRestController:获取股票的REST调用:pko
2021-09-01 11:30:14,691信息[reactor-http-nio-3]com.baeldung.spring.rsocket.client.marketDataRestController:获取股票的REST调用:pko

在服务器中,我获得如下日志

//从客户端获取数据
2021-09-01 11:30:14,843 INFO[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketdatarsocketController:获取数据:MarketDataRequest(stock=pko)
//记录Contoller线程在调用存储库后前进
2021-09-01 11:30:14,844 INFO[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketDataRocketController线r>2021-09-01 11:30:14,862信息[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketDataRepository:repository thread go speel zzzzz
//repository finish work
2021-09-01 11:30:24,863信息[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketDataRepository thread move forward

服务器只处理单个调用,等待存储库完成作业。然后以类似的方式处理下一个请求:

2021-09-01 11:30:24,874信息[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketdatarsocketcontroller:获取数据:MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874信息[reactor-http-nio-3]com.baeldung.spring.rsocketcontroller cket.server.marketdatarepository:Repository thread go speel zzzzz
2021-09-01 11:30:34,876 INFO[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketdatarepository:Repository thread move forward

我不明白为什么服务器在一个接一个地处理调用。也许代码中有问题,或者我没有理解正确的东西。提前谢谢你。

共有1个答案

师向文
2023-03-14

在Reactor中,默认情况下,所有内容都在主线程上运行。调用thread.sleep,主线程阻塞,应用程序冻结。如果希望模拟长时间运行的操作,可以使用delayElements运算符

.delayElements(Duration.ofSeconds(10));

注:Reactor阻塞猎犬检测并报告此类阻塞呼叫。

 类似资料:
  • 我正在使用Spring Boot构建一个RESTful web服务。我的IDE是Eclipse Oxygen。 这里是我的控制器代码: 我的控制台输出是: 控制台输出显示每5秒调用一次控制器。但我每隔两秒就发送一次请求。 是否可以接受来自同一会话的并发多个请求? 谢谢!

  • 问题内容: 我的Flask应用程序必须进行大量计算才能获取特定页面。在Flask执行该功能时,其他用户无法访问该网站,因为Flask忙于进行大量计算。 有什么方法可以使我的Flask应用程序接受来自多个用户的请求? 问题答案: 是的,将应用程序部署在其他WSGI服务器上,请参阅Flask部署选项文档。 Flask随附的服务器组件实际上仅用于开发应用程序时;即使可以将其配置为处理并发请求(从Flas

  • 问题内容: 我所拥有的: 我有一个nodejs express服务器获取端点,该端点依次调用其他耗时的API(例如大约2秒)。我已经通过回调调用了此函数,使得res.send作为回调的一部分被触发。res.send对象打包了一个对象,该对象将在执行这些耗时的API调用的结果之后创建。因此,仅当我从API调用中获得全部信息时,才能发送我的res.send。 一些代表性的代码。 我要什么 我希望我的服

  • 表模式如下: 表A的主键[ID1(分区键)id2(分区键)id3(群集键)] 表B主键[ID1(分区键)id2(分区键)状态(聚类键)id3(聚类键)] 那么在卡桑德拉我该怎么解决呢?

  • 问题内容: 我有一个返回promise的方法,并且在内部该方法调用一个API,该API每分钟只能有20个请求。问题是我有很多对象(大约300个),并且我想为每个对象调用API。 目前,我有以下代码: 但是它不处理时序约束。我希望我可以使用_.chunk和_.debounce之类的东西,但是我无法解决这个问题。有人可以帮我吗? 问题答案: 您可以每分钟发送1个包含20个请求的块,或者每3秒将其间隔1

  • 假设我有一个Employee类。它有很多字段,比如id、名字、姓氏、姓名、年龄、薪水和其他字段。现在,我正在进行一个Get查询,希望使用所有这些字段(required=false)作为请求参数传递。 但问题是,可能有许多组合,如(firstName, age)或(age,工资,lastName)或(指定,年龄,工资,lastName)等等。那么我应该如何处理所有这些筛选器。我必须为每个案例编写每个