当前位置: 首页 > 工具软件 > Netty > 使用案例 >

Netty-Netty入门实例

裘光启
2023-12-01

Netty入门实例-TCP服务

  • 实例要求

    • Netty服务器在6668端口监听,客户端能发送消息给服务器“hello,服务器”
    • 服务器可以回复消息给客户端“hello,客户端”
  • 步骤

    • 服务端步骤
      • 创建了两个NIOEventLoopGroup线程组,BossGroup和WorkerGroup
        • BossGroup只处理连接请求,真正的客户端的业务处理会交给WorkerGroup
        • 两个NioEventLoopGroup都是无限循环的
      • 创建服务端的启动对象(根据服务端启动引导类ServerBootstrap),配置参数
      • 使用链式进行编程,调用group()方法,设置两个线程组,BossGroup和WorkerGroup,从而确定了线程模型
      • 调用channel()方法,给Netty指定IO模型为NIO
        • NioServerSocketChannel:指定服务器的IO模型为NIO,与BIO编程模型中的serverSocket对应
        • NioSocketChannel:指定客户端的IO模型为NIO,与BIO编程模型中的Socket对应
      • 调用option()方法,设置队列得到的连接个数
      • 调用childOption()方法,设置保持活动的连接状态
      • 调用childHandler()方法,给引导类创建一个ChannelInitializer,然后在initChannel方法中调用socketChannel.pipline().addLast()方法,添加自定义的服务端的业务处理消息逻辑MyNettyServerHandler
      • 调用bind()方法给ServerBootstrap绑定端口
    • 客户端步骤
      • 创建客户端的NIOEventLoopGroup
      • 创建客户端启动对象(根据客户端启动引导类Bootstrap)
      • 设置相关参数
      • 调用group方法设置线程组
      • 调用channel()方法,给Netty指定IO模型为NIO
        • NioSocketChannel:指定客户端的IO模型为NIO,与BIO编程模型中的Socket相对应
      • 调用childHandler()方法,给引导类创建一个ChannelInitializer,然后在initChannel方法中调用socket.pipiline.addLast()方法,添加自定义的客户端业务处理消息逻辑MyNettyClientHandler
      • 调用Bootstrap.connect()进行连接,这个方法有两个参数
        • inetHost:ip地址
        • inetPort:端口号
  • 代码

    • NettyServer

      • package com.jl.java.web.nettyserver;
        
        import io.netty.bootstrap.ServerBootstrap;
        import io.netty.channel.*;
        import io.netty.channel.nio.NioEventLoopGroup;
        import io.netty.channel.socket.SocketChannel;
        import io.netty.channel.socket.nio.NioServerSocketChannel;
        import io.netty.channel.socket.nio.NioSocketChannel;
        
        /**
         * @author jiangl
         * @version 1.0
         * @date 2021/5/22 23:39
         */
        public class MyNettyServer {
        
            public static void test(){
                EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,bossGroup);
            }
        
            public void test2(){
                EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                EventLoopGroup workGroup = new NioEventLoopGroup();
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workGroup);
        
            }
        
            public  void test4(){
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workGroup = new NioEventLoopGroup();
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workGroup);
            }
        
            public static void main(String[] args) throws InterruptedException {
                //创建BossGroup workerGroup
                //说明
                //1.创建两个线程组bossGroup workerGroup
                //2.bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
                //3.两个都是无限循环
                //4.bossGroup 和 workerGroup含有的子线程(NioEventLoop)的个数
                    //默认实际 cpu 核数 * 2
                EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        
                try {
                    //创建服务器端的启动对象,配置参数
                    ServerBootstrap bootstrap = new ServerBootstrap();
        
                    //使用链式编程来进行设置
                    bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                            .channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
                            .option(ChannelOption.SO_BACKLOG,128)//设置队列得到连接个数
                            .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                //创建一个通道测试对象
                                /**
                                 * 给pipline 设置处理器
                                 * @param ch
                                 * @throws Exception
                                 */
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    ch.pipeline().addLast(new MyNettyServerHandler());
                                }
                            });//给workerGroup的EventLoop对应的管道设置处理器
                    System.out.println("........服务器 is ready");
        
                    //绑定一个端口并且同步,生成了一个ChannelFuture对象
                    ChannelFuture sync = bootstrap.bind(6668).sync();
        
                    //对关闭通道进行监听
                    sync.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //优雅的关闭
                    bossGroup.shutdownGracefully();
                }
            }
        }
        
        
        
    • NettyServerHandler

      • package com.jl.java.web.nettyserver;
        
        import io.netty.buffer.ByteBuf;
        import io.netty.buffer.Unpooled;
        import io.netty.channel.ChannelHandlerContext;
        import io.netty.channel.ChannelInboundHandlerAdapter;
        import io.netty.util.CharsetUtil;
        
        /**
         * 说明
         * 1.自定义一个handler 需要继承netty规定好的某个HandlerAdapter(规范)
         * @author jiangl
         * @version 1.0
         * @date 2021/5/23 10:41
         */
        public class MyNettyServerHandler extends ChannelInboundHandlerAdapter {
        
            /**
             * 读取数据实际(可以读取客户端发送的消息)
             * @param ctx 上下文对象,含有管道pipeline ,通道channel
             * @param msg 客户端发送的数据,默认Object类型
             * @throws Exception
             */
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("服务器读取线程 "+ Thread.currentThread().getName());
                System.out.println("server ctx="+ctx);
                //将msg 转成一个 ByteBuf
                ByteBuf buf = (ByteBuf) msg;
                System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
                System.out.println("客户端的地址是:"+ctx.channel().remoteAddress());
            }
        
            /**
             * 数据读取完毕
             * @param ctx
             * @throws Exception
             */
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                /**
                 * 将数据写入到缓冲并刷新
                 * 一般需要对发送的数据进行编码
                 */
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
            }
        
            /**
             * 处理异常,关闭通道
             * @param ctx
             * @param cause
             * @throws Exception
             */
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ctx.close();
            }
        }
        
    • NettyClient

      • package com.jl.java.web.nettyserver;
        
        import io.netty.bootstrap.Bootstrap;
        import io.netty.channel.ChannelFuture;
        import io.netty.channel.ChannelInitializer;
        import io.netty.channel.EventLoopGroup;
        import io.netty.channel.nio.NioEventLoopGroup;
        import io.netty.channel.socket.SocketChannel;
        import io.netty.channel.socket.nio.NioSocketChannel;
        
        /**
         * @author jiangl
         * @version 1.0
         * @date 2021/5/23 10:50
         */
        public class MyNettyClient {
            public static void main(String[] args) throws InterruptedException {
                //客户端需要一个事件循环组
                EventLoopGroup eventExecutors = new NioEventLoopGroup();
        
                try {
                    //创建客户端启动对象
                    //注意客户端使用的不是ServerBootstrap 而是Bootstrap
                    Bootstrap bootstrap = new Bootstrap();
        
                    //设置相关参数
                    bootstrap.group(eventExecutors)//设置线程组
                            .channel(NioSocketChannel.class)//设置客户端通道的实现类(反射来实现)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    //加入自己的处理器
                                    ch.pipeline().addLast(new MyNettyClientHandler());
                                }
                            });
                    System.out.println("客户端 ok....");
                    //启动客户端,去连接服务器端
                    //关于ChannelFuture要分析,涉及到netty的异步模型
                    ChannelFuture sync = bootstrap.connect("127.0.0.1", 6668).sync();
                    //给关闭通道进行监听
                    sync.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    eventExecutors.shutdownGracefully();
                }
        
            }
        }
        
    • NettyClientHandler

      • package com.jl.java.web.nettyserver;
        
        import io.netty.buffer.ByteBuf;
        import io.netty.buffer.Unpooled;
        import io.netty.channel.ChannelHandlerContext;
        import io.netty.channel.ChannelInboundHandlerAdapter;
        import io.netty.util.CharsetUtil;
        
        /**
         * @author jiangl
         * @version 1.0
         * @date 2021/5/23 10:57
         */
        public class MyNettyClientHandler extends ChannelInboundHandlerAdapter {
        
            /**
             * 当通道就绪时就会触发该方法
             * @param ctx
             * @throws Exception
             */
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("client "+ctx);
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务器。。。。", CharsetUtil.UTF_8));
            }
        
            /**
             * 当通道有读取事件时,会触发
             * @param ctx
             * @param msg
             * @throws Exception
             */
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
                System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
            }
        
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                cause.printStackTrace();
                ctx.close();
            }
        }
        

 类似资料: