当前位置: 首页 > 知识库问答 >
问题:

从RSocket Java客户端连接到Spring Boot RSocket服务器时出错

孟鹤龄
2023-03-14

我在通过TCP连接到Spring Boot RSocket应用程序时遇到问题。使用RSocketRequest ester时的客户端工作正常,但当我尝试使用RSocketFactory客户端连接时,它总是出错。下面的代码。

        RSocket rSocket = this.client = RSocketFactory
            .connect()
            .mimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.toString(), MediaType.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(TcpClientTransport.create("localhost", 7000))
            .start()
            .block();


Flux<Payload> s = rSocket.requestStream(DefaultPayload.create("1234", "socket"));
    s.subscribe();

这会产生以下错误:

java.lang.IndexOutOfBoundsException: readerIndex(1) + length(115) exceeds writerIndex(6): AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 1, widx: 6, cap: 6/6, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 1024))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1477)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1463)
at io.netty.buffer.AbstractByteBuf.readSlice(AbstractByteBuf.java:880)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:47)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:37)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extractEntry(DefaultMetadataExtractor.java:136)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extract(DefaultMetadataExtractor.java:119)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.createHeaders(MessagingRSocket.java:195)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:167)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.requestStream(MessagingRSocket.java:127)
at io.rsocket.RSocketResponder.requestStream(RSocketResponder.java:207)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:310)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)

据我所知,这个特殊的错误是由于netty消息包装(来自stackoverflow上的其他线程)造成的,但如何解决呢?服务器是Spring Boot 5 RSocket,但客户端仅使用RSocket Java。

共有2个答案

平学
2023-03-14

由此,使用以下内容生成元数据。

CompositeByteBuf metadata = ByteBufAllocator.DEFAULT.compositeBuffer();
RoutingMetadata routingMetadata = TaggingMetadataCodec.createRoutingMetadata(ByteBufAllocator.DEFAULT, List.of("/route"));
CompositeMetadataCodec.encodeAndAddMetadata(metadata,
        ByteBufAllocator.DEFAULT,
        WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
        routingMetadata.getContent());
危裕
2023-03-14

问题出在mime类型内部。在您的情况下,服务器等待CBOR,但您继续应用程序/json

代码解决方案:更改RSocketRequester初始化的方式,如下面的示例所示,您的客户端正在发送CBOR,您可以通过启用调试看到:logging.level.io.rsocket。帧记录器:调试。这一切都是为了hello world,不需要定制策略或在客户端实现工厂

@Bean
RSocketRequester rSocketRequester(RSocketStrategies strategies) {
    return RSocketRequester
            .builder()
            .rsocketStrategies(strategies)
            .connectTcp("127.0.0.1", 7000)
            .retry(5)
            .block();
    }

顺便说一句,即使使用自定义Encoder,我也没有在两侧使用JSON找到解决方案

 类似资料:
  • 我正在尝试使用hazelcast v3。2.4(服务器和客户端上的版本相同)。服务器(我可以安装的简单实现)正在服务器上运行。客户端尝试连接到远程服务器-服务器打印身份验证请求,但我收到以下日志输出(包括异常)-关于我可以做什么不同的想法(复制日志输出和配置文件)。我正在尝试通过TCP/IP进行连接,我检查了网络连接——我没有看到任何东西阻止连接。 堆栈中提到的代码行: 配置 日志输出 服务器输出

  • 当我尝试使用套接字将物理设备连接到服务器时,我遇到了一个问题。在服务器端,它似乎不接受任何连接,而在客户端,套接字超时。你知道为什么会这样吗? 我在下面提供我的代码 服务器代码: 客户端: 11-16 23:32:11.016:W/系统。错误(24213):java.net。ConnectException:无法连接到/192.168.1.116(端口9090):连接失败:ETIMEDOUT(连接

  • 我正在学习SSL,在这个过程中,我试图在一个服务器之间创建一个SSL连接。NET服务器和Java客户端。为此,我使用自签名证书。我不想在Java中使用标准密钥库,所以我创建了一个自定义密钥库并加载它。 我使用以下步骤来生成证书和pfx文件,以便在. NET服务器端使用。 > makecert-r-pe-sr“localhost”-$individual-n“CN=localhost”-sv。pvk

  • (**)我创建了一个 ASP.NET Web 服务器,托管 SignalR 中心。我遵循了本教程。它有效。然后我创建了一个Xamarin.Android客户端(*)。问题是我无法连接到服务器。我想,由于我的主机地址中的“本地主机”,我无法连接。例外是相当大的。 如果我的怀疑是正确的,我该如何解决?如何使本地主机看起来像普通服务器? (*) -我也试过Xamarin。一般形式。 (**) - 请看问

  • 我有一个示例Spring启动应用程序来运行图形QL服务器,具有作为客户端,我的pom有以下依赖项: 当我尝试从客户端连接时,出现以下错误: 狩猎决议好心建议。 我还有几个问题: 我应该使用SimpleGraphQLHttpServlet将请求路由到endpoint吗 我正在React UI上使用apollo client,那么它是强制使用apollo server还是spring boot可以工作

  • 下面是我的服务器代码: 下面是我的客户端活动代码: 以下是客户端活动的xml布局文件: 因此,我开始认为这不是连接端口的问题,而是应用程序的android客户端的问题。但我想不出有什么问题。 顺便说一下,当我试图发送消息时,运行客户端的手机和运行服务器的笔记本电脑都连接到了同一个网络。