当前位置: 首页 > 教程 > Netty >

Netty实践-时间服务器

精华
小牛编辑
186浏览
2023-03-14

本教程中实现的协议是TIME协议。 它与先前的示例不同,时间服务器只发送包含32位整数的消息,而不接收任何请求,并在消息发送后关闭连接。 在本示例中,您将学习如何构造和发送消息,以及在完成时关闭连接。

因为时间服务器将忽略任何接收到的数据,但是一旦建立连接就发送消息,所以我们不能使用channelRead()方法。而是覆盖channelActive()方法。 以下是代码的实现:

package com.yiibai.netty.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

下面我们来看看上面代码的一些解释分析:

  1. 如上所述,当建立连接并准备好生成流量时,将调用channelActive()方法。现在在这个方法中编写一个32位的整数来表示当前的时间。
  2. 要发送新消息,需要分配一个包含消息的新缓冲区。我们要写入一个32位整数,因此需要一个ByteBuf,其容量至少为4个字节。 通过ChannelHandlerContext.alloc()获取当前的ByteBufAllocator并分配一个新的缓冲区。

  3. 像之前一样,编写构造的消息。
    但是,在NIO中发送消息之前,我们是否曾调用java.nio.ByteBuffer.flip()? ByteBuf没有这样的方法,它只有两个指针; 一个用于读取操作,另一个用于写入操作。 当您向ByteBuf写入内容时,写入索引会增加,而读取器索引不会更改。读取器索引和写入器索引分别表示消息的开始和结束位置。
    相比之下,NIO缓冲区不提供一个干净的方式来确定消息内容开始和结束,而不用调用flip方法。当您忘记翻转缓冲区时,就将会遇到麻烦,因为不会发送任何或发送不正确的数据。但是这样的错误不会发生在Netty中,因为不同的操作类型我们有不同的指针。
    另一点要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法返回一个ChannelFutureChannelFuture表示尚未发生的I/O操作。这意味着,任何请求的操作可能尚未执行,因为所有操作在Netty中是异步的。 例如,以下代码可能会在发送消息之前关闭连接:

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();
    

    因此,需要在ChannelFuture完成后调用close()方法,该方法由write()方法返回,并在写入操作完成时通知其监听器。 请注意,close()也可能不会立即关闭连接,并返回一个ChannelFuture。

当写请求完成时,我们如何得到通知? 这就像向返回的ChannelFuture添加ChannelFutureListener一样简单。 在这里,我们创建了一个新的匿名ChannelFutureListener,当操作完成时关闭Channel

或者,可以使用预定义的侦听器来简化代码:

f.addListener(ChannelFutureListener.CLOSE);

要测试我们的时间服务器是否按预期工作,可以使用UNIX rdate命令:

$ rdate -o <port> -p <host>

其中<port>是在main()方法中指定的端口号,<host>通常是localhost或服务器的IP地址。

编写时间客户端

DISCARDECHO服务器不同,我们需要一个用于TIME协议的客户端,因为我们无法将32位二进制数据转换为日历上的日期。 在本节中,我们讨论如何确保服务器正常工作并学习如何使用Netty编写客户端。

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的BootstrapChannel实现。 请看看下面的代码:

package com.yiibai.netty.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. BootstrapServerBootstrap类似,只是它用于非服务器通道,例如客户端或无连接通道。

  2. 如果只指定一个EventLoopGroup,它将同时用作boss组和worker组。boss组和worker组不是用于客户端。

  3. 不使用NioServerSocketChannel,而是使用NioSocketChannel来创建客户端通道。

  4. 注意,这里不像我们使用的ServerBootstrap,所以不使用childOption(),因为客户端SocketChannel没有父类。

  5. 应该调用connect()方法,而不是bind()方法。

如上面所见,它与服务器端代码没有什么不同。 ChannelHandler实现又是怎么样的呢? 它应该从服务器接收一个32位整数,将其转换为人类可读的格式,打印转换为我们熟知的时间格式 ,并关闭连接:

package com.yiibai.netty.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            Date currentTime = new Date(currentTimeMillis);
            System.out.println("Default Date Format:" + currentTime.toString());

            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateString = formatter.format(currentTime);
            // 转换一下成中国人的时间格式
            System.out.println("Date Format:" + dateString);

            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

(1).TCP/IP中,Netty读取从对端发送的ByteBuf数据。

客户端看起来很简单,与服务器端示例没什么区别。 但是,这个处理程序有时会拒绝抛出IndexOutOfBoundsException。 我们将在下一节讨论为什么会发生这种情况。

先运行 TimeServer.java 程序,然后再运行 TimeClient.java , 当运行 TimeClient.java时就可以到有一个时间日期输出,然后程序自动退出。输出结果如下 -

Default Date Format:Thu Mar 02 20:50:23 CST 2017
Date Format:2017-03-02 20:50:23