我和Spring Boot一起玩RSocket。我想做一个简单的请求-响应示例。作为示例,我从这个链接中获取代码:
当我运行示例代码不做更改时,我会在异常请求期间得到错误。这个错误不是这个问题的重点,但我只是想展示由Baeldung对原始源代码的更改。
[reactor-tcp-nio-1]org.springframework.core.log.compositeLog:[5927A44D-9]500 HTTP GET“/current/pko”io.rsocket.exceptions.applicationerRoreXception:在io.rsocket.exceptions.exceptions.from(exceptions.java:76)没有destination“的处理程序:reactor.core.publisher.fluxonassemblyException:在以下站点观察到错误:_checkpoint handler”[ExceptionHandlingWebHandler]堆栈跟踪:at io.rsocket.exceptions.exceptions.from(Exceptions.java:76)at io.rsocket.core.rsocketRequester.handleframe(rsocketRequester.java:706)at io.rsocketRequester.handleincomingFrames(rsocketRequester.java:640)at reactor.core.publisher.lambdaSubscriber.onnext(Er.FLuxgroupby$unicastgroupdflux.drainregular(fluxgroupby.java:554)在reactor.core.publisher.fluxgroupby$unicastgroupby.java:630)在reactor.core.publisher.fluxgroupby$unicastgroupby.drain(fluxgroupby.java:630)在reactor.core.publisher.fluxgroupby$unicastgroupby.java:670)在mapconditionalsubscriber.onnext(fluxmap.java:213)在reactor.core.publisher.fluxmap$mapconditionalsubscriber.onnext(fluxmap.java:213)在reactor.netty.channel.fluxreceive.drainreceive(fluxreceive.java:213)在reactor.netty.channel.fluxreceive.drainreceive(fluxreceive.java:260)在InvokeChannelRead(AbstractChannelHandlerContext.java:379)at io.netty.channel.abstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)在io.netty.channel.abstractChannelContext.fireChannelRead(AbstractChannelHandlerContext.java:365)在io.netty.channel.abstractChannelContext.fireChannelRead(AbstractChannelHandlerContext.java:357)在handlerContext.java:379)在io.netty.channel.abstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)在io.netty.channel.abstractChannelContext.java:365),在io.netty.channel.abstractChannelContext.fireChannelRead(AbstractChannelHandlerContext.java:357)在埃尚内尔里德(AbstractChannelHandlerContext.java:365)在io.netty.channel.defaultChannelPipeline.firechannelRead(DefaultChannelPipeline.java:919)在io.netty.channel.nio.abstractNiobyTechannel$niobyTeUnsafe.Read(AbstractNiobyTechannel.java:163)在io.netty.channel.nio.nioEventLoop.processSelectedKey(NioEventLoop.java:714)在在io.netty.channel.nio.nioEventLoop.run(nioEventLoop.java:493),在io.netty.util.concurrent.singleThreadEventExecutor$4.run(singleThreadEventExecutor.java:989),在io.netty.util.internal.ThreadExecutorMap$2.run(threadExecutorMap.java:74),在io.netty.util.concurrent.fastThreadLocalRunnable.java:30,在
所以我将客户端代码从
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
到
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
}
这个小改动帮助,不发生异常。另一个问题是,来自客户端(请求者)的请求由服务器(响应者)一个接一个地处理。我创建了SOAPUI REST项目,并在2个线程中运行GET请求。它看起来像服务器使用单线程。这不是我期望达到的目标。
@Controller
public class MarketDataRSocketController {
Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
logger.info("Getting data for: "+marketDataRequest);
Mono<MarketData> result = marketDataRepository.getOne(marketDataRequest.getStock());
logger.info("Controller thread move forward: "+marketDataRequest);
return result;
}
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}
}
@Component
public class MarketDataRepository {
Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
private static final int BOUND = 100;
private Random random = new Random();
public Mono<MarketData> getOne(String stock) {
//return return Mono.just(getMarketDataResponse(stock)); original code from baeldung.
return Mono.just(stock).map(s -> getMarketDataResponse(s));
}
private MarketData getMarketDataResponse(String stock) {
logger.info("Repository thread go speel ZzzZZ");
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
logger.info("Repository thread move forward");
return new MarketData(stock, random.nextInt(BOUND));
}
}
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
}
@RestController
public class MarketDataRestController {
Logger logger = LoggerFactory.getLogger(MarketDataRestController.class);
private final Random random = new Random();
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
logger.info("Get REST call for stock : "+stock);
return rSocketRequester.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
}
在客户端日志中,我得到:
2021-09-01 11:30:14,614信息[reactor-http-nio-2]com.baeldung.spring.rsocket.client.marketDataRestController:获取股票的REST调用:pko
2021-09-01 11:30:14,691信息[reactor-http-nio-3]com.baeldung.spring.rsocket.client.marketDataRestController:获取股票的REST调用:pko
在服务器中,我获得如下日志:
//从客户端获取数据
2021-09-01 11:30:14,843 INFO[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketdatarsocketController:获取数据:MarketDataRequest(stock=pko)
//记录Contoller线程在调用存储库后前进
2021-09-01 11:30:14,844 INFO[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketDataRocketController线r>2021-09-01 11:30:14,862信息[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketDataRepository:repository thread go speel zzzzz
//repository finish work
2021-09-01 11:30:24,863信息[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketDataRepository thread move forward
服务器只处理单个调用,等待存储库完成作业。然后以类似的方式处理下一个请求:
2021-09-01 11:30:24,874信息[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketdatarsocketcontroller:获取数据:MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874信息[reactor-http-nio-3]com.baeldung.spring.rsocketcontroller cket.server.marketdatarepository:Repository thread go speel zzzzz
2021-09-01 11:30:34,876 INFO[reactor-http-nio-3]com.baeldung.spring.rsocket.server.marketdatarepository:Repository thread move forward
我不明白为什么服务器在一个接一个地处理调用。也许代码中有问题,或者我没有理解正确的东西。提前谢谢你。
在Reactor中,默认情况下,所有内容都在主线程上运行。调用thread.sleep
,主线程阻塞,应用程序冻结。如果希望模拟长时间运行的操作,可以使用delayElements运算符:
.delayElements(Duration.ofSeconds(10));
注:Reactor阻塞猎犬检测并报告此类阻塞呼叫。
我正在使用Spring Boot构建一个RESTful web服务。我的IDE是Eclipse Oxygen。 这里是我的控制器代码: 我的控制台输出是: 控制台输出显示每5秒调用一次控制器。但我每隔两秒就发送一次请求。 是否可以接受来自同一会话的并发多个请求? 谢谢!
问题内容: 我的Flask应用程序必须进行大量计算才能获取特定页面。在Flask执行该功能时,其他用户无法访问该网站,因为Flask忙于进行大量计算。 有什么方法可以使我的Flask应用程序接受来自多个用户的请求? 问题答案: 是的,将应用程序部署在其他WSGI服务器上,请参阅Flask部署选项文档。 Flask随附的服务器组件实际上仅用于开发应用程序时;即使可以将其配置为处理并发请求(从Flas
问题内容: 我所拥有的: 我有一个nodejs express服务器获取端点,该端点依次调用其他耗时的API(例如大约2秒)。我已经通过回调调用了此函数,使得res.send作为回调的一部分被触发。res.send对象打包了一个对象,该对象将在执行这些耗时的API调用的结果之后创建。因此,仅当我从API调用中获得全部信息时,才能发送我的res.send。 一些代表性的代码。 我要什么 我希望我的服
表模式如下: 表A的主键[ID1(分区键)id2(分区键)id3(群集键)] 表B主键[ID1(分区键)id2(分区键)状态(聚类键)id3(聚类键)] 那么在卡桑德拉我该怎么解决呢?
问题内容: 我有一个返回promise的方法,并且在内部该方法调用一个API,该API每分钟只能有20个请求。问题是我有很多对象(大约300个),并且我想为每个对象调用API。 目前,我有以下代码: 但是它不处理时序约束。我希望我可以使用_.chunk和_.debounce之类的东西,但是我无法解决这个问题。有人可以帮我吗? 问题答案: 您可以每分钟发送1个包含20个请求的块,或者每3秒将其间隔1
假设我有一个Employee类。它有很多字段,比如id、名字、姓氏、姓名、年龄、薪水和其他字段。现在,我正在进行一个Get查询,希望使用所有这些字段(required=false)作为请求参数传递。 但问题是,可能有许多组合,如(firstName, age)或(age,工资,lastName)或(指定,年龄,工资,lastName)等等。那么我应该如何处理所有这些筛选器。我必须为每个案例编写每个