当前位置: 首页 > 知识库问答 >
问题:

Netty TCP Server-ByteBuff大小在解码消息时停留在1024的Project Reactor

微生乐
2023-03-14
public class StringDecoder implements Function<Buffer, String> {

    // Buffer is limited to 1024 but the message the client sent
    // was 2k
    @Override
    public String apply(Buffer bytes) {
        return bytes.toString();
    }
}

问题二:如何将apply函数(上面的)分块?这意味着,当Netty的缓冲区达到极限时,我的apply函数可以创建自己的缓冲区(并管理缓冲区),最终我可以解码消息,Reactory/Netty可以将其传递给使用者。

注意:在我的“main”方法中,以下内容用于设置环境。Netty服务器运行在Windows下,客户端运行在Linux上。这与TCP的Windows实现有关吗?

// NOTES: ServerSocketOptions sets the max buffer for send and receive to 
// something much larger than 1024. Verified with debugger
TcpServerSpec<String, String> spec = new TcpServerSpec<String, String>(NettyTcpServer.class);
spec.env(env);
spec.listen(port);
spec.dispatcher("sync");
spec.codec(new AgentCodec());
spec.consume(connectionHandler(handler));

TcpServer<String, String> tcp = spec.get();
tcp.start().await();

共有1个答案

归松
2023-03-14

好的,在调试了代码之后,我发现了问题所在--顺便说一下,这在我在Reactor项目(tisk tisk)上找到的任何文档中都找不到。

apply函数将继续被调用更多的消息。因此,如果消息是2K,apply将被调用两次。第一次用1K,第二次用2K。

注意:如果您没有前进的缓冲区位置,它将追加到缓冲区。如果您将缓冲区位置提前,则当apply再次调用时,它将被移除。

 类似资料:
  • 我将pub/sub订阅逻辑包装在subscribe方法中,该方法在服务初始化期间为每个订阅调用一次: 此方法是这样调用的: 回调方法执行一系列操作,发送2封电子邮件,然后确认消息 null 有什么想法吗?

  • 如何获得Kafka单张唱片的大小? 有一些关于我为什么需要这个的说明。 这似乎不是在ConsumerRecord或RecordMetadata类上公开的serializedValueSize。我并不真正理解这个属性的价值,因为它与对消费者有用的消息的大小不匹配。serializedValueSize用于什么? 最大Poll.Records 在对poll()的单个调用中返回的最大记录数。 这并不存在

  • 我正在运行kafka2.11-0.9.0.0和一个基于Java的生产者/消费者。与消息~70 KB一切工作良好。但是,在生产者将一个更大的70 MB消息排入队列之后,kafka似乎停止将消息传递给消费者。即。不仅大的消息没有传递,后续的小消息也没有传递。我知道制作人成功了,因为我使用了kafka回调进行确认,我可以在kafka消息日志中看到消息。 kafka配置自定义更改: 使用者配置:

  • 问题内容: 我尝试使用Python的ConfigParser模块保存设置。对于我的应用程序,重要的是要在我的部分中保留每个名称的大小写。文档提到将str()传递给ConfigParser.optionxform()可以完成此操作,但对我而言不起作用。名称均为小写。我想念什么吗? 我得到的Python伪代码: 问题答案: 该文档令人困惑。他们的意思是: 即覆盖optionxform,而不是调用它;重

  • 是否有一种方法可以强制TextInputLayout错误消息使用一行? 我尝试将放置在TextInputLayout上,其error_apperation样式包含: 但它不起作用。然而,改变颜色是有效的。我不明白,mErrorView只是一个文本视图,它应该可以工作。

  • 我的示例接受google pub/sub,记录它并将ack发送回 配置: NotificationHandler: 线程转储: 行: 但ThreadExecutor未执行deliverMessageTask。 在我看来,它看起来像是库中的一个bug,但可能是库误用。无论如何,我正在寻找任何解决方案/变通办法来避免这种情况。 我使用: spring-boot springCloudVersion=