一、apache.commons.pool2介绍
Apache Commons Pool
库提供了一整套用于实现对象池化的API,以及若干种各具特色的对象池实现。2.0版本,并非是对1.x的简单升级,而是一个完全重写的对象池的实现,显著的提升了性能和可伸缩性,并且包含可靠的实例跟踪和池监控。具体来讲就是个对象池的框架。
二、连接池实现
1.maven 引用
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
</dependency>
2.先创建一个pool的基类
Pool.java
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.io.Closeable;
/**
* 资源池基类,封装GenericObjectPool中相应的方法
* @param <T>
*/
public abstract class Pool<T> implements Closeable {
protected GenericObjectPool<T> internalPool;
public Pool() {
}
public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
initPool(poolConfig, factory);
}
@Override
public void close() {
destroy();
}
public boolean isClosed() {
return this.internalPool.isClosed();
}
public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
if (this.internalPool != null) {
try {
closeInternalPool();
} catch (Exception e) {
}
}
this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
}
/**
* 获取一个对象资源
* @return
*/
public T getResource() {
try {
return internalPool.borrowObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
protected void returnResourceObject(final T resource) {
if (resource == null) {
return;
}
try {
internalPool.returnObject(resource);
} catch (Exception e) {
e.printStackTrace();
}
}
protected void returnBrokenResource(final T resource) {
if (resource != null) {
returnBrokenResourceObject(resource);
}
}
protected void returnResource(final T resource) {
if (resource != null) {
returnResourceObject(resource);
}
}
public void destroy() {
closeInternalPool();
}
protected void returnBrokenResourceObject(final T resource) {
try {
internalPool.invalidateObject(resource);
} catch (Exception e) {
e.printStackTrace();
}
}
protected void closeInternalPool() {
try {
internalPool.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public int getNumActive() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getNumActive();
}
public int getNumIdle() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getNumIdle();
}
public int getNumWaiters() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getNumWaiters();
}
public long getMeanBorrowWaitTimeMillis() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getMeanBorrowWaitTimeMillis();
}
public long getMaxBorrowWaitTimeMillis() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getMaxBorrowWaitTimeMillis();
}
private boolean poolInactive() {
return this.internalPool == null || this.internalPool.isClosed();
}
public void addObjects(int count) {
try {
for (int i = 0; i < count; i++) {
this.internalPool.addObject();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码解读:
GenericObjectPool 是对象池的类,上面的pool类是对 GenericObjectPool 进行了相应封装,用于做基类
3.创建 NettyChannel 类,是netty连接实现
NettyChannel.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* netty 客户端连接对象类
*/
public class NettyChannel {
private String host;
private int port;
private NettyClientHandler nettyClientHandler= new NettyClientHandler();
NettyChannel(String host, int port){
try {
this.host=host;
this.port=port;
connect();
}catch (Exception e){
}
}
private Channel ch;
public Channel getCh() {
return ch;
}
public void connect() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(nettyClientHandler);
}
});
ch = b.connect(host, port).sync().channel();
}
}
NettyClientHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.writeAndFlush(Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx){
//ctx.writeAndFlush(Unpooled.copiedBuffer("hi,i am netty client", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
ByteBuf in = (ByteBuf)msg;
System.out.println("server: "+in.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
4.创建ChannelPool.java用来实现 Pool 类
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
/**
* 通道池
*/
public class ChannelPool extends Pool<NettyChannel> {
public ChannelPool(final GenericObjectPoolConfig poolConfig, final String host, final int port) {
super(poolConfig,new ChannelFactory(host,port));
}
public ChannelPool( final String host, final int port) {
super(new GenericObjectPoolConfig(),new ChannelFactory(host,port));
}
@Override
public NettyChannel getResource() {
NettyChannel connection = super.getResource();
return connection;
}
@Override
public void returnBrokenResource(final NettyChannel resource) {
if (resource != null) {
returnBrokenResourceObject(resource);
}
}
@Override
public void returnResource(final NettyChannel resource) {
if (resource != null) {
try {
returnResourceObject(resource);
} catch (Exception e) {
returnBrokenResource(resource);
}
}
}
}
5.还需要创建一个工厂类
ChannelFactory.java
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
/**
* 通道工厂
*/
public class ChannelFactory implements PooledObjectFactory<NettyChannel> {
private String host;
private int port;
ChannelFactory(String host,int port){
this.host=host;
this.port=port;
}
@Override
public void activateObject(PooledObject<NettyChannel> nettyChannel) throws Exception {
}
@Override
public void destroyObject(PooledObject<NettyChannel> nettyChannel) throws Exception{
}
@Override
public PooledObject<NettyChannel> makeObject() throws Exception {
NettyChannel conn=new NettyChannel(host,port);
return new DefaultPooledObject<NettyChannel>(conn);
}
@Override
public void passivateObject(PooledObject<NettyChannel> nettyChannel) throws Exception {
// TODO maybe should select db 0? Not sure right now.
}
@Override
public boolean validateObject(PooledObject<NettyChannel> nettyChannel) {
return true;
}
}
在此全部完毕。
如何使用呢?设置好配置创建资源
GenericObjectPoolConfig config=new GenericObjectPoolConfig();
config.setMaxIdle(10);//最大活跃数
config.setMinIdle(1);//最小活跃数
config.setMaxTotal(100);//最大总数
//创建资源池
ChannelPool channelPool=new ChannelPool(config,HOST,PORT);
//获取连接
NettyChannel nettyChannel=channelPool.getResource();
//发送消息
nettyChannel.getCh().writeAndFlush("hello word!");
//回收资源
channelPool.returnResource(nettyChannel);
netty服务端代码在这里就不贴出来了,完成demo请到github上下载
https://github.com/fayechenlong/common-pool2-netty.git
注:这个框架还是比较简单易学的,大家可以以此类推尝试写一个数据库连接池等