当前位置: 首页 > 知识库问答 >
问题:

Netty-writeAndFlush和消息排序

穆劲
2023-03-14

我正在尝试实现分布式参与者模型,该模型使用Netty作为通信协议——具有TCP连接的NIO版本。

假设我们有2个节点(机器),每个节点都有Netty的服务器实例,它们将传入的消息传递给该节点上的参与者。我希望为同一对远程参与者保持消息顺序,因此我的解决方案是使用异步writeAndFlush方法将消息发送到远程节点和参与者-当在第一条消息交付之前需要向同一参与者发送另一条消息时,我会将其添加到缓冲区,并回调writeAndFlush消息,处理缓冲区中的下一个。看起来是这样的:

   channel.writeAndFlush(message).addListener(new MessageListener(mailboxOfSelector));

回调方法是:

    @Override
    public void operationComplete(ChannelFuture future) throws Exception {

        Queue<RemoteMessage> unsentToMailbox = unsentMessages.get(mailboxOfSelector);

        if (!unsentToMailbox.isEmpty()) {
            RemoteMessage message = unsentToMailbox.poll();
            channel.writeAndFlush(message).addListener(this);
        }
    }

所以,如果A和B是两个与通道连接的服务器实例,我们从A发送-

当它完成A上的最后一个处理程序时返回,还是当它被传递到B上的第一个处理程序时返回?

共有1个答案

艾仲渊
2023-03-14

Netty5中。alpha2版本。将数据刷新到socketchannel后,Netty然后回调operationComplete方法。在这种情况下,这并不意味着数据到达客户端。这意味着数据已发送到TCP协议栈。您可以在源代码中看到以下内容:

io.netty.channel.ChannelOutboundBuffer.java

它将调用的promise.try成功()删除()方法或删除(原因原因),女巫可以触发操作完成()方法。

 类似资料:
  • 在什么情况下netty中的writAndFlush函数返回false。我们如何调试它?我需要启用某些东西才能在通道上写入吗?我还检查了通道,它是可写的。 谢谢,Adib

  • 看下面的代码: 该代码是从 netty 的安全聊天客户端类 http://netty.io/wiki/user-guide-for-4.x.html 修改而来的,并添加了行 在while循环之前。服务器上的输出不读取该行。我不明白为什么会这样,对我来说,ch.writeandflush方法本身似乎在循环之外不起作用。 如果我不应该在循环之外使用ch.writeandlfush,有没有更好的方法在启

  • 我正在使用netty构建一个应用程序。在应用程序中,我需要处理传入和传出的消息。要求是应用程序将发送的任何消息都应由特定的处理程序处理,进入应用程序的任何消息都应由另一个特定的处理程序处理。但是,我希望在两个处理程序之间交换消息,以便能够跟踪发送的消息响应,因为请求消息也将发送到应用程序。 请任何想法hwo实施这样的要求。这个问题听起来可能不相关,但这就是我得到的,我还不是一个网络极客。我读到的关

  • 问题内容: 我正在尝试使用Netty迈出第一步,为此目的,我在Netty上编写了简单的服务器,在oio纯TCP上编写了简单的客户端。 客户端发送随机文本数据包,并且必须接收“ Ack”消息。请参阅处理程序方法: 问题是-当我尝试发送回“ Ack”字符串时-客户端什么也没收到。但是,当我尝试发送回来的消息时,它工作正常,并且我在客户端看到回显。 方法需要,我尝试发送-但没有任何反应。我也尝试发送(我

  • 我刚刚开始学习Netty,想慢慢来真正理解它是如何工作的。我有一个基于独立套接字测试程序的初始用例: 从客户端连接到服务器时,立即发送消息并处理响应 很简单...或者我是这么想的。我已经看了好几天了,不太明白为什么它的表现不如预期。 这是最初的测试程序,它再次简单地连接到远程服务器,并立即向服务器写入字节缓冲区。然后服务器立即发送一个ack响应,该响应被写入控制台。 我用Netty做了同样的测试,

  • 我是Netty的新手,需要以自定义方式处理消息。我有以下接口: 现在我从客户端收到一些数据,并希望对其进行处理。我正在实现