实例要求
步骤
代码
NettyServer
package com.jl.java.web.nettyserver;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author jiangl
* @version 1.0
* @date 2021/5/22 23:39
*/
public class MyNettyServer {
public static void test(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,bossGroup);
}
public void test2(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup);
}
public void test4(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup);
}
public static void main(String[] args) throws InterruptedException {
//创建BossGroup workerGroup
//说明
//1.创建两个线程组bossGroup workerGroup
//2.bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
//3.两个都是无限循环
//4.bossGroup 和 workerGroup含有的子线程(NioEventLoop)的个数
//默认实际 cpu 核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
//创建一个通道测试对象
/**
* 给pipline 设置处理器
* @param ch
* @throws Exception
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyNettyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("........服务器 is ready");
//绑定一个端口并且同步,生成了一个ChannelFuture对象
ChannelFuture sync = bootstrap.bind(6668).sync();
//对关闭通道进行监听
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的关闭
bossGroup.shutdownGracefully();
}
}
}
NettyServerHandler
package com.jl.java.web.nettyserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 说明
* 1.自定义一个handler 需要继承netty规定好的某个HandlerAdapter(规范)
* @author jiangl
* @version 1.0
* @date 2021/5/23 10:41
*/
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据实际(可以读取客户端发送的消息)
* @param ctx 上下文对象,含有管道pipeline ,通道channel
* @param msg 客户端发送的数据,默认Object类型
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 "+ Thread.currentThread().getName());
System.out.println("server ctx="+ctx);
//将msg 转成一个 ByteBuf
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端的地址是:"+ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
/**
* 将数据写入到缓冲并刷新
* 一般需要对发送的数据进行编码
*/
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
}
/**
* 处理异常,关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
NettyClient
package com.jl.java.web.nettyserver;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author jiangl
* @version 1.0
* @date 2021/5/23 10:50
*/
public class MyNettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是ServerBootstrap 而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类(反射来实现)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入自己的处理器
ch.pipeline().addLast(new MyNettyClientHandler());
}
});
System.out.println("客户端 ok....");
//启动客户端,去连接服务器端
//关于ChannelFuture要分析,涉及到netty的异步模型
ChannelFuture sync = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
NettyClientHandler
package com.jl.java.web.nettyserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author jiangl
* @version 1.0
* @date 2021/5/23 10:57
*/
public class MyNettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪时就会触发该方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client "+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务器。。。。", CharsetUtil.UTF_8));
}
/**
* 当通道有读取事件时,会触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}