java用apache.commons.pool2实现一个连接池来管理netty的客户端长连接

郭弘盛
2023-12-01

 

一、apache.commons.pool2介绍

Apache Commons Pool库提供了一整套用于实现对象池化的API,以及若干种各具特色的对象池实现。2.0版本,并非是对1.x的简单升级,而是一个完全重写的对象池的实现,显著的提升了性能和可伸缩性,并且包含可靠的实例跟踪和池监控。具体来讲就是个对象池的框架。

二、连接池实现

1.maven 引用

    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.42.Final</version>
    </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.8.0</version>
        </dependency>

2.先创建一个pool的基类

Pool.java
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.io.Closeable;

/**
 * 资源池基类,封装GenericObjectPool中相应的方法
 * @param <T>
 */
public abstract class Pool<T> implements Closeable {

  protected GenericObjectPool<T> internalPool;

  public Pool() {
  }

  public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
    initPool(poolConfig, factory);
  }

  @Override
  public void close() {
    destroy();
  }

  public boolean isClosed() {
    return this.internalPool.isClosed();
  }

  public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {

    if (this.internalPool != null) {
      try {
        closeInternalPool();
      } catch (Exception e) {
      }
    }

    this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
  }

  /**
   * 获取一个对象资源
   * @return
   */
  public T getResource() {
    try {
      return internalPool.borrowObject();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }

  protected void returnResourceObject(final T resource) {
    if (resource == null) {
      return;
    }
    try {
      internalPool.returnObject(resource);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  protected void returnBrokenResource(final T resource) {
    if (resource != null) {
      returnBrokenResourceObject(resource);
    }
  }

  protected void returnResource(final T resource) {
    if (resource != null) {
      returnResourceObject(resource);
    }
  }

  public void destroy() {
    closeInternalPool();
  }

  protected void returnBrokenResourceObject(final T resource) {
    try {
      internalPool.invalidateObject(resource);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  protected void closeInternalPool() {
    try {
      internalPool.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public int getNumActive() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getNumActive();
  }

  public int getNumIdle() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getNumIdle();
  }

  public int getNumWaiters() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getNumWaiters();
  }

  public long getMeanBorrowWaitTimeMillis() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getMeanBorrowWaitTimeMillis();
  }

  public long getMaxBorrowWaitTimeMillis() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getMaxBorrowWaitTimeMillis();
  }

  private boolean poolInactive() {
    return this.internalPool == null || this.internalPool.isClosed();
  }

  public void addObjects(int count) {
    try {
      for (int i = 0; i < count; i++) {
        this.internalPool.addObject();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

代码解读:

GenericObjectPool 是对象池的类,上面的pool类是对 GenericObjectPool 进行了相应封装,用于做基类

3.创建  NettyChannel 类,是netty连接实现

  NettyChannel.java

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * netty 客户端连接对象类
 */
public class NettyChannel {
    private String host;
    private int port;
    private NettyClientHandler nettyClientHandler= new NettyClientHandler();

    NettyChannel(String host, int port){
        try {
            this.host=host;
            this.port=port;
            connect();
        }catch (Exception e){
        }
    }
    private Channel ch;

    public Channel getCh() {
        return ch;
    }

    public void connect() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline p = socketChannel.pipeline();
                        p.addLast(nettyClientHandler);
                    }
                });
        ch = b.connect(host, port).sync().channel();
    }
}
NettyClientHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;


public class NettyClientHandler extends ChannelDuplexHandler {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        ctx.writeAndFlush(Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx){
       //ctx.writeAndFlush(Unpooled.copiedBuffer("hi,i am netty client", CharsetUtil.UTF_8));
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        ByteBuf in = (ByteBuf)msg;
        System.out.println("server: "+in.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
4.创建ChannelPool.java用来实现 Pool 类 
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

/**
 * 通道池
 */
public class ChannelPool  extends Pool<NettyChannel> {

    public ChannelPool(final GenericObjectPoolConfig poolConfig, final String host, final int port) {
        super(poolConfig,new ChannelFactory(host,port));
    }
    public ChannelPool( final String host, final int port) {
        super(new GenericObjectPoolConfig(),new ChannelFactory(host,port));
    }
    @Override
    public NettyChannel getResource() {
        NettyChannel connection = super.getResource();
        return connection;
    }
    @Override
    public void returnBrokenResource(final NettyChannel resource) {
        if (resource != null) {
            returnBrokenResourceObject(resource);
        }
    }
    @Override
    public void returnResource(final NettyChannel resource) {
        if (resource != null) {
            try {
                returnResourceObject(resource);
            } catch (Exception e) {
                returnBrokenResource(resource);
            }
        }
    }
    
}

5.还需要创建一个工厂类

ChannelFactory.java
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;

/**
 * 通道工厂
 */
public class ChannelFactory implements PooledObjectFactory<NettyChannel> {
    private String host;
    private int port;

    ChannelFactory(String host,int port){
        this.host=host;
        this.port=port;
    }
    @Override
    public void activateObject(PooledObject<NettyChannel> nettyChannel) throws Exception {

    }
    @Override
    public void destroyObject(PooledObject<NettyChannel> nettyChannel) throws Exception{

    }
    @Override
    public PooledObject<NettyChannel> makeObject() throws Exception {
        NettyChannel conn=new NettyChannel(host,port);
        return new DefaultPooledObject<NettyChannel>(conn);
    }
    @Override
    public void passivateObject(PooledObject<NettyChannel> nettyChannel) throws Exception {
        // TODO maybe should select db 0? Not sure right now.
    }
    @Override
    public boolean validateObject(PooledObject<NettyChannel> nettyChannel) {
        return true;
    }

}

在此全部完毕。

如何使用呢?设置好配置创建资源

    
            GenericObjectPoolConfig config=new GenericObjectPoolConfig();

            config.setMaxIdle(10);//最大活跃数
            config.setMinIdle(1);//最小活跃数
            config.setMaxTotal(100);//最大总数

            //创建资源池
            ChannelPool channelPool=new ChannelPool(config,HOST,PORT);
            //获取连接
            NettyChannel nettyChannel=channelPool.getResource();
            //发送消息
            nettyChannel.getCh().writeAndFlush("hello word!");
            //回收资源
            channelPool.returnResource(nettyChannel);

netty服务端代码在这里就不贴出来了,完成demo请到github上下载

https://github.com/fayechenlong/common-pool2-netty.git

注:这个框架还是比较简单易学的,大家可以以此类推尝试写一个数据库连接池等

 

 类似资料: