当前位置: 首页 > 教程 > Netty >

Netty实践-处理基于流的传输

精华
小牛编辑
237浏览
2023-03-14

TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据。 例如,假设操作系统的TCP/IP堆栈已收到三个数据包:

由于基于流的协议的这种通用属性,在应用程序中以下面的碎片形式(只是其中的一种)读取它们的机会很高:

因此,接收部分,无论是服务器侧还是客户端侧,都应该将接收到的数据碎片整理成逻辑可由应用容易地理解的一个或多个有意义的帧。 在上述示例的情况下,接收的数据应该如下成帧:

针对上面的问题,下面列出了两个解决方案。

第一个解决方案

现在我们回到TIME客户端示例。在这里有同样的问题。 32位整数可以算是非常少量的数据量了,并且不可能经常被分段。 然而,问题是它可以分割,并且碎片的可能性将随着流量增加而增加。

简单的解决方案是创建一个内部累积缓冲区,并等待所有4个字节被接收到内部缓冲区。 以下是修正的TimeClientHandler实现,它修复了问题:

package com.yiibai.netty.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandler有两个生命周期侦听器方法:handlerAdded()handlerRemoved()。 只要不会阻塞很长时间,您就可以执行任意初始化任务。

  2. 首先,所有接收到的数据应累加到buf中。

  3. 然后,处理程序必须检查buf是否有足够的数据(在此示例中为4个字节),当足够时就继续进行实际的业务逻辑。否则,在有更多数据到达时Netty将再次调用channelRead()方法,最终累积到达4个字节再执行实际的业务。

第二个解决方案

虽然第一个解决方案已经解决了TIME客户端的问题,但修改的处理程序看起来不那么干净。想象如果一个更复杂的协议,它由多个字段组成,例如:可变长度字段等。上面的ChannelInboundHandler实现很快就无法维护了。

可能已经注意到,可以向ChannelPipeline添加多个ChannelHandler,因此,可将一个单片的ChannelHandler拆分为多个模块,以降低应用程序的复杂性。 例如,可将TimeClientHandler拆分为两个处理程序:

  • TimeDecoder处理碎片问题
  • TimeClientHandler的初始简单版本

幸运的是,Netty提供了一个可扩展类,可以帮助我们方便地编写:

package com.yiibai.netty.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelInboundHandler的一个实现,它使得处理碎片问题变得容易。

  2. ByteToMessageDecoder在接收到新数据时,使用内部维护的累积缓冲区调用decode()方法。

  3. decode()可以决定在累积缓冲区中没有足够数据的情况下不添加任何东西。 当接收到更多数据时,ByteToMessageDecoder将再次调用decode()

  4. 如果decode()将对象添加到out,则意味着解码器成功地解码了消息。 ByteToMessageDecoder将丢弃累积缓冲区的读取部分。要记住,不需要解码多个消息。 ByteToMessageDecoder将继续调用decode()方法,直到它没有再有任何东西添加。

现在我们有另一个处理程序插入ChannelPipeline,应该在TimeClient中修改ChannelInitializer实现:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果您喜欢折腾,也可以想尝试使用ReplayDecoder,这简化了解码器更多的工作。但需要参考API参考以获得更多信息。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,Netty提供了现成的解码器,使我们能够非常容易地实现大多数的协议,并帮助您避免使用一个单一的不可维护的处理程序实现。有关更多详细示例,请参阅以下示例:

二进制协议实现: Netty实践-factorial服务器
基于文本行的协议实现: Netty实践-telnet服务器