任务队列中的Task有3种典型使用场景
用户程序自定义的普通任务
package com.jl.java.web.nettyserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 说明
* 1.自定义一个handler 需要继承netty规定好的某个HandlerAdapter(规范)
* @author jiangl
* @version 1.0
* @date 2021/5/23 10:41
*/
public class MyNettyServerHandler2 extends ChannelInboundHandlerAdapter {
/**
* 读取数据实际(可以读取客户端发送的消息)
* @param ctx 上下文对象,含有管道pipeline ,通道channel
* @param msg 客户端发送的数据,默认Object类型
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//当任务中存在耗时业务逻辑时,可以使用异步执行
//解决方案1:用户程序自定义的普通任务
ctx.channel().eventLoop().execute(()->{
try {
Thread.sleep(10 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端1~",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ctx.channel().eventLoop().execute(()->{
try {
Thread.sleep(1 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端2~",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("go on....");
}
/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
/**
* 将数据写入到缓冲并刷新
* 一般需要对发送的数据进行编码
*/
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
}
/**
* 处理异常,关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
用户自定义定时任务
package com.jl.java.web.nettyserver;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
* 说明
* 1.自定义一个handler 需要继承netty规定好的某个HandlerAdapter(规范)
* @author jiangl
* @version 1.0
* @date 2021/5/23 10:41
*/
public class MyNettyServerHandler3 extends ChannelInboundHandlerAdapter {
/**
* 读取数据实际(可以读取客户端发送的消息)
* @param ctx 上下文对象,含有管道pipeline ,通道channel
* @param msg 客户端发送的数据,默认Object类型
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//当任务中存在耗时业务逻辑时,可以使用异步执行
//解决方案2:用户自定义定时任务-》该任务是提交到,scheduleTaskQueue中
ctx.channel().eventLoop().schedule(()->{
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端1~",CharsetUtil.UTF_8));
},10, TimeUnit.SECONDS);
System.out.println("go on....");
}
/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
/**
* 将数据写入到缓冲并刷新
* 一般需要对发送的数据进行编码
*/
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
}
/**
* 处理异常,关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
非当前Reactor线程调用Channel的各种方法
例如在推送系统的业务线程里,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费
package com.jl.java.web.nettyserver;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author jiangl
* @version 1.0
* @date 2021/5/22 23:39
*/
public class MyNettyServer4 {
public static void main(String[] args) throws InterruptedException {
//创建BossGroup workerGroup
//说明
//1.创建两个线程组bossGroup workerGroup
//2.bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
//3.两个都是无限循环
//4.bossGroup 和 workerGroup含有的子线程(NioEventLoop)的个数
//默认实际 cpu 核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
List<SocketChannel> list = new ArrayList<>();
//使用链式编程来进行设置
bootstrap.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
//创建一个通道测试对象
/**
* 给pipline 设置处理器
* @param ch
* @throws Exception
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
list.add(ch);
ch.pipeline().addLast(new MyNettyServerHandler3());
//定时任务线程池
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
});
/**
* 定时任务延迟10秒后,每个10进行一次推送
*/
scheduledThreadPoolExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if(list.size()>0){
for(SocketChannel socketChannel : list){
socketChannel.writeAndFlush(Unpooled.copiedBuffer("定时推送",CharsetUtil.UTF_8));
}
}
}
},10,10,TimeUnit.SECONDS);
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("........服务器 is ready");
//绑定一个端口并且同步,生成了一个ChannelFuture对象
ChannelFuture sync = bootstrap.bind(6668).sync();
//对关闭通道进行监听
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的关闭
bossGroup.shutdownGracefully();
}
}
}
Dispatcher
模式,IO多路复用监听事件,我觉得这个名字更贴合该模式的含义,即 I/O 多路复用监听事件,收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程。aio
系列函数是由 POSIX 定义的异步操作接口,不是真正的操作系统级别支持的,而是在用户空间模拟出来的异步,并且仅仅支持基于本地文件的 aio 异步操作,网络编程中的 socket 是不支持的,这也使得基于 Linux 的高性能网络程序都是使用 Reactor 方案。IOCP
,是由操作系统级别实现的异步 I/O,真正意义上异步 I/O,因此在 Windows 里实现高性能网络程序可以使用效率更高的 Proactor 方案。基本介绍
io.netty.util.concurren.Future说明
io.netty.channel.ChannelFuture说明
在使用Netty进行编程时,拦截操作和转换出入栈数据只需要开发则提供callback或利用Future即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码。
Netty框架的目的就是让业务逻辑从网络基础应用编码中分离出来、解脱出来
案例
Future-Listener机制
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作
举例说明
演示:绑定端口是异步操作,当绑定操作处理完,将会调用响应的监听器处理逻辑
ChannelFuture sync = bootstrap.bind(6668).addListener(future -> {
if(future.isSuccess()){
System.out.println("端口绑定成功");
}else{
System.out.println("端口绑定失败");
}
})
小结:相比传统阻塞I/O,执行I/O操作后线程会被阻塞主,直到操作完成;异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量