对象池就是一种存放对象的池,于数据库连接池、线程池类似,都是典型的池化设计思想。
对象池的优点就是可以集中管理池中的对象,可以减少频繁创建和销毁长期使用的对象,从而可以提升复用性,节约资源的消耗,进而有效避免频繁为对象分配内存和释放堆中内存,可以有效减轻JVM垃圾回收器的负担,避免内存抖动。
Apache Common Pool2的核心内部类如下:
ObjectPool
:对象池接口,对象池实体,取用对象的地方。负责对对象进行生命周期的管理,提供了对对象池中活跃对象和空闲对象的统计功能。
borrowObject
、returnObject
。addObject
。invalidateObject
。getNumActive
、getNumIdle
。PooledObject
:被包装的对象,是池中的对象,除去对象本身之外还包含了创建时间,上次被调用时间等众多信息。PooledObjectFactory
:对象工厂,管理对象的生命周期,提供了具体对象创建、销毁、验证、钝化、激活等一系列的功能。BaseObjectPoolConfig
:提供一些必要的配置,例如空闲队列是否先进先出、工厂对象是否需要测试、对象从对象池中取出时是否测试等基础属性。
GenericObjectPoolConfig
:继承了本类做了默认配置,在实际使用过程中,继承该类即可,可以集合业务情况扩展对象池配置,例如数据库连接池线程前缀、字符串池长度或者命名规则等。KeyedObjectPool<K, V>
:键值对形式的对象池接口,使用场景较少。KeyedPooledObjectFactory<K, V>
:同上,为键值对对象池管理对象的工厂。对象池核心接口主要包括如下:
/**
*向对象池中增加对象实例
*/
void addObject() throws Exception, IllegalStateException,
UnsupportedOperationException;
/**
* 从对象池中获取对象
*/
T borrowObject() throws Exception, NoSuchElementException,
IllegalStateException;
/**
* 失效非法的对象
*/
void invalidateObject(T obj) throws Exception;
/**
* 释放对象至对象池
*/
void returnObject(T obj) throws Exception;
对象工厂时commons-pool2
框架中用于生成对象的核心环节,业务方在使用过程中需要自己实现对应的工程实现类,通过工厂模式,实现对象池于对象的生成与实现过程细节的解耦,每一个对象池都有对象工厂的成员变量,如此实现对象池本身和对象的生成逻辑解耦。
public GenericObjectPool(final PooledObjectFactory<T> factory) {
this(factory, new GenericObjectPoolConfig<T>());
}
public GenericObjectPool(final PooledObjectFactory<T> factory,
final GenericObjectPoolConfig<T> config) {
super(config, ONAME_BASE, config.getJmxNamePrefix());
if (factory == null) {
jmxUnregister(); // tidy up
throw new IllegalArgumentException("factory may not be null");
}
this.factory = factory;
idleObjects = new LinkedBlockingDeque<>(config.getFairness());
setConfig(config);
}
public GenericObjectPool(final PooledObjectFactory<T> factory,
final GenericObjectPoolConfig<T> config, final AbandonedConfig abandonedConfig) {
this(factory, config);
setAbandonedConfig(abandonedConfig);
}
可以看到对象池的构造方法,都依赖于构造工厂PooledObjectFactory
,在生成对象的时候,基于对象池中定义的参数和对象构造工厂来生成。
/**
* 向对象池中增加对象,一般在预加载的时候会使用该功能
*/
@Override
public void addObject() throws Exception {
assertOpen();
if (factory == null) {
throw new IllegalStateException(
"Cannot add objects without a factory.");
}
final PooledObject<T> p = create();
addIdleObject(p);
}
create()
方法基于对象工厂来生成对象:
final PooledObject<T> p;
try {
p = factory.makeObject();
if (getTestOnCreate() && !factory.validateObject(p)) {
createCount.decrementAndGet();
return null;
}
} catch (final Throwable e) {
createCount.decrementAndGet();
throw e;
} finally {
synchronized (makeObjectCountLock) {
makeObjectCount--;
makeObjectCountLock.notifyAll();
}
}
此处确认了factory.makeObject()
的操作,也印证了上述的推测,基于对象工厂来生成对应的对象。
为了更好的能够实现对象池中对象的使用以及跟踪对象的状态,commons-pool2
框架中使用了池化对象PooledObject
的概念,PooledObject
本身是泛型类,并提供了getObject()
获取实际对象的方法。
通过查看源码PooledObjectState
枚举,下面为池对象多有可能处于的状态。
public enum PooledObjectState {
//在空闲队列中,还未被使用
IDLE,
//使用中
ALLOCATED,
//在空闲队列中,当前正在测试是否满足被驱逐的条件
EVICTION,
//不在空闲队列中,目前正在测试是否可能被驱逐。因为在测试过程中,试图借用对象,并将其从队列中删除。
//回收测试完成后,它应该被返回到队列的头部。
EVICTION_RETURN_TO_HEAD,
//在队列中,正在被校验
VALIDATION,
//不在队列中,当前正在验证。该对象在验证时被借用,由于配置了testOnBorrow,
//所以将其从队列中删除并预先分配。一旦验证完成,就应该分配它。
VALIDATION_PREALLOCATED,
//不在队列中,当前正在验证。在之前测试是否将该对象从队列中移除时,曾尝试借用该对象。
//一旦验证完成,它应该被返回到队列的头部。
VALIDATION_RETURN_TO_HEAD,
//无效状态(如驱逐测试或验证),并将/已被销毁
INVALID,
//判定为无效,将会被设置为废弃
ABANDONED,
//正在使用完毕,返回池中
RETURNING
}
状态理解
abandoned
:被借出后,长时间未被使用则标记为该状态,如代码所示,当对象处于allocated
状态,即被借出使用中,距离上次被使用的时间超过了设置的getRemoveAbandonedTimeout
则被标记为废弃。
private void removeAbandoned(final AbandonedConfig abandonedConfig) {
// Generate a list of abandoned objects to remove
final long now = System.currentTimeMillis();
final long timeout =
now - (abandonedConfig.getRemoveAbandonedTimeout() * 1000L);
final ArrayList<PooledObject<T>> remove = new ArrayList<>();
final Iterator<PooledObject<T>> it = allObjects.values().iterator();
while (it.hasNext()) {
final PooledObject<T> pooledObject = it.next();
synchronized (pooledObject) {
if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
pooledObject.getLastUsedTime() <= timeout) {
pooledObject.markAbandoned();
remove.add(pooledObject);
}
}
}
```
// 流程理解
//1.对象真实是存储在哪里?
private PooledObject<T> create() throws Exception {
.....
final PooledObject<T> p;
try {
p = factory.makeObject();
.....
allObjects.put(new IdentityWrapper<>(p.getObject()), p);
return p;
}
查看allObjects
,所有对象都存储于ConcurrentHashMap
,除了被杀掉的对象。
对象池提供了许多配置项,GenericObjectPool
默认基础对象池中可以通过构造函数传参传入GenericObjectPoolConfig
,同意也可以传入其底层实现的基础类BaseObjectPoolConfig
。
maxTotal
:对象池中最大使用数量,默认为8.
maxIdle
:对象池中空闲对象的最大数量,默认为8.
minIdle
:对象池中空闲对象的最小数量,默认为8.
lifo
:当获取对象池中的空闲实例时,是否需要遵循后进先出的原则,默认为true
。
blockWhenExhausted
:当对象处于exhausted
状态,即可用实例为空时,是否阻塞来获取实例的线程,默认为true
。
fairness
:当对象池处于exhausted
状态,即可用实例为空时,大量线程在同时阻塞等待获取可用的实例,fairness
配置来控制是否启用公平锁算法,即先到先得,默认为false
,这一项的前提是blockWhenExhausted
配置为true
。
maxWaitMillis
:最大阻塞时间,当对象池处于exhausted
状态,即可用实例为空时,大量线程同时阻塞等待获取可用的实例,如果阻塞时间超过了maxWaitMillis
将会抛出异常,当此值为负时,代表无限期阻塞直到可用,默认为-1
;
testOnCreate
:创建对象前是否校验(即调用工厂的validateObject()
方法),如果校验失败,那么borrowObject()
返回将失败,默认为false
。
testOnBorrow
:取用对象前是否校验,默认为false
。
testOnReturn
:返回对象池前是否检验,即调用工程的returnObject()
,若检验失败会销毁对象而不是返回池中,默认为false
。
testBetweenEvictionRunsMillis
:驱逐周期,默认为-1代表不仅区局测试。
testWhileIdle
:处于idle队列中即闲置的对象是否被驱逐器进行驱逐验证,当该对象上次运行时间距当前超过setTimeBetweenEvictionRunsMillis(long)
设置的值,将会被驱逐验证,调用validateObject()
方法,若验证成功,对象将会销毁,默认为false
。
通过GenericObjectPool
源码来分析对象池如如何实现对对象的生命周期进行管理,包括对整个对象池中对象数量的控制等逻辑。
对象池中使用双端队列LinkedBlockingDeque
来进行对象存储,LinkedBlockdingDeque
队列支持FIFO
和FILO
两种策略,基于AQS
来实现队列操作的协同。
LinkedBlockingDeque
提供了队尾和队头的插入和移除元素的操作,相关操作都进行了加入重入锁的加锁操作,队列中设置notFull
和notEmpty
两个状态变量,当对队列进行元素的操作的时候会触发对应的await
和notify
等操作。
/**
* 第一个节点
* Invariant: (first == null && last == null) ||
* (first.prev == null && first.item != null)
*/
private transient Node<E> first; // @GuardedBy("lock")
/**
* 最后一个节点
* Invariant: (first == null && last == null) ||
* (last.next == null && last.item != null)
*/
private transient Node<E> last; // @GuardedBy("lock")
/** 当前队列长度 */
private transient int count; // @GuardedBy("lock")
/** 队列最大容量 */
private final int capacity;
/** 主锁 */
private final InterruptibleReentrantLock lock;
/** 队列是否为空状态锁 */
private final Condition notEmpty;
/** 队列是否满状态锁 */
private final Condition notFull;
队列的核心点为:
队列中所有的移入元素,移出、初始化构造元素都是基于主锁进行加锁操作。
队列的offer
和pull
支持设置超时时间参数,主要是通过两个状态COndition
来进行协调操作。如进行offer
操作时,如果操作不成功,则基于notNull
状态对象进行等待。
public boolean offerFirst(final E e, final long timeout, final TimeUnit unit)
throws InterruptedException {
Objects.requireNonNull(e, "e");
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
while (!linkFirst(e)) {
if (nanos <= 0) {
return false;
}
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
如进行pull
操作的时候,如果操作不成功,则对notEmpty
进行等待操作。
public E takeFirst() throws InterruptedException {
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null) {
notEmpty.await();
}
return x;
} finally {
lock.unlock();
}
}
反之当操作成功的时候,则进行唤醒操作,如下所示:
private boolean linkLast(final E e) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity) {
return false;
}
final Node<E> l = last;
final Node<E> x = new Node<>(e, l, null);
last = x;
if (first == null) {
first = x;
} else {
l.next = x;
}
++count;
notEmpty.signal();
return true;
}
browObject
的逻辑首先根据AbandonedConfig
配置判断是否取用对象前进行清理操作,
之后从idleObject
中尝试获取对象,获取不到就创建新的对象
//1、尝试从双端队列中获取对象,pollFirst方法是非阻塞方法
p = idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0) {
//2、没有设置最大阻塞等待时间,则无限等待
p = idleObjects.takeFirst();
} else {
//3、设置最大等待时间了,则阻塞等待指定的时间
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
}
blockWhenExhausted
是否设置为true
(这个配置的意思是当对象池的active
状态的对象数量已经达到最大值maxinum
时是否进行阻塞直到有空闲对象)。borrowMaxWaitMillis
属性等待可用对象。调用allocate
使得状态更改为ALLOCATED
状态。
有可用对象后调用工厂的factory.activateObject
方法激活对象,如果发生错误,调用destory
方法来销毁对象。
当getTestOnBorrow
设置为true
时,用factory.validObject(p)
对对象进行校验,通过校验后执行下一步。
调用updateStateBorrow
方法,在对象被成功借出后更新一些统计项,例如返回对象池的对象个数等。
//....
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
//....
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
assertOpen();
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
(getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3) ) {
removeAbandoned(ac);
}
PooledObject<T> p = null;
// Get local copy of current config so it is consistent for entire
// method execution
final boolean blockWhenExhausted = getBlockWhenExhausted();
boolean create;
final long waitTime = System.currentTimeMillis();
while (p == null) {
create = false;
p = idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0) {
p = idleObjects.takeFirst();
} else {
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object");
}
} else {
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
}
if (!p.allocate()) {
p = null;
}
if (p != null) {
try {
factory.activateObject(p);
} catch (final Exception e) {
try {
destroy(p, DestroyMode.NORMAL);
} catch (final Exception e1) {
// Ignore - activation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to activate object");
nsee.initCause(e);
throw nsee;
}
}
if (p != null && getTestOnBorrow()) {
boolean validate = false;
Throwable validationThrowable = null;
try {
validate = factory.validateObject(p);
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!validate) {
try {
destroy(p, DestroyMode.NORMAL);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (final Exception e) {
// Ignore - validation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
return p.getObject();
}
returnObject
的过程执行逻辑markRetuningState
方法将状态更改为RETURNING
。testOnReturn
配置调用PooledObjectedFactory
的validateObject
方法以进行可用性检查。如果检查失败,则调用destory
消除该对象,然后调用idle
确保池中有IDLE
状态对象可用,如果没有,则调用create
方法创建一个新对象。PooledObjectFactory
的passivateObject
方法进行反初始化操作。deallocate
将状态更改为IDLE
。LIFO
(后进先出)配置将对象放置在队列的开头或结尾。在获取对象的时候,会从双端队列中阻塞等待获取元素(或者创建新对象),但是如果是应用程序的异常。一直未调用returnObject
或者invalidObject
的时候,可能会出现对象池中的对象一直上升,到达设置的上限后再去调用borrowObject
的时候就会出现一直等待或者是等待超时而无法获取对象的情况。
为了避免上述问题,有两种自我保护机制
基于阈值的检测
从对象池中获取对象的时候会检验当前对象池的活跃对象和空闲对象的数量占比,当空闲独享非常少,活跃对象非常多的时候,会触发空闲对象的回收。具体的校验规则为:如果当前对象池中少于2个idle
状态的对象或者active
数量>
最大对象数-3的时候,在borrow
对象的时候启动泄露清理。通过AbandonedConfig.setRemoveAbandonedOnBorrow
为true
进行开启。
//根据配置确定是否要为标签删除调用removeAbandoned方法
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) && (getNumActive() > getMaxTotal() - 3) ) {
removeAbandoned(ac);
}
异常调度线程检测
AbabdonedConfig.setRemoveAbaodonedOnMaintenance
设置为true
后,以维护任务运行的时候会进行泄漏对象的清理。同规格设置setTimeBetweenEvictionRunsMillis
来设置维护任务执行的时间间隔。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tlDVU1Jb-1640599013012)(…/Documents/Notes/2735fd9505adf66b0dfe0c8b4a0ec44a.png)]
检测和回收实现逻辑分析
在构造方法内部逻辑的最后调用了startEvictor
方法,这个方法的作用时在构造完对象池后,启动回收器来监控回收空闲对象。startEvictor
定义在GenericObjectPool
的父类BaseGenericObjectPool
(抽象)类中。
构造器中会执行如下的设置参数:
public final void setTimeBetweenEvictionRunsMillis(
final long timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
startEvictor(timeBetweenEvictionRunsMillis);
}
当且仅当设置了timeBetweenEvictionRunsMillis
参数后才会开启定时清理任务。
final void startEvictor(final long delay) {
synchronized (evictionLock) {
EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);
evictor = null;
evictionIterator = null;
//如果delay<=0则不会开启定时清理任务
if (delay > 0) {
evictor = new Evictor();
EvictionTimer.schedule(evictor, delay, delay);
}
}
}
回收逻辑
判断对象池可以进行回收的时候,直接调用了destory
机械能回收。
boolean evict;
try {
evict = evictionPolicy.evict(evictionConfig, underTest,
idleObjects.size());
} catch (final Throwable t) {
// Slightly convoluted as SwallowedExceptionListener
// uses Exception rather than Throwable
PoolUtils.checkRethrow(t);
swallowException(new Exception(t));
// Don't evict on error conditions
evict = false;
}
if (evict) {
// 如果可以被回收则直接调用destroy进行回收
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
为了提高回收的效率,在回收策略判断对象的状态不是evict
的时候,也会进行进一步的状态判断和处理,具体逻辑如下:
尝试激活对象,如果激活失败则任务该对象已经不再存活,直接调用destory
进行销毁。
在激活对象成功的情况下,会通过validateObject
方法校验对象状态,如果校验失败,则说明对象不可用,需要进行销毁。
boolean active = false;
try {
// 调用activateObject激活该空闲对象,本质上不是为了激活,
// 而是通过这个方法可以判定是否还存活,这一步里面可能会有一些资源的开辟行为。
factory.activateObject(underTest);
active = true;
} catch (final Exception e) {
// 如果激活的时候,发生了异常,就说明该空闲对象已经失联了。
// 调用destroy方法销毁underTest
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
if (active) {
// 再通过进行validateObject校验有效性
if (!factory.validateObject(underTest)) {
// 如果校验失败,说明对象已经不可用了
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
} else {
try {
/*
*因为校验还激活了空闲对象,分配了额外的资源,那么就通过passivateObject把在activateObject中开辟的资源释放掉。
*/
factory.passivateObject(underTest);
} catch (final Exception e) {
// 如果passivateObject失败,也可以说明underTest这个空闲对象不可用了
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
}
}
passivateObject(PooledObject<T> p)
和passivateObject(PooledObject<T> p)
即对象的激活和钝化方法有有什么作用? 对象在使用完返回对象池时,如果校验失败则之间销毁,如果检验通过则需要先钝化对象之后再存入空闲队列。至于激活对象的方法再上述取用对象时也会先激活之后取出。因此处于空闲和使用中的对象除去状态不一致,也可以通过激活于钝化的方式增加新的差异。
例如:建立ElasticSearch连接池,每个对象都是一个带有IP和端口的连接实例,很明显访问es集群的是多个不同IP,所以每次访问的IP不一定相同。我们可以将激活操作设置为对象赋值IP和端口,钝化操作设置为将IP和端口归为默认值或者为空。
public void returnObject(final T obj) {
final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
//....
//校验失败直接销毁 return
//...
try {
factory.passivateObject(p);
} catch (final Exception e1) {
swallowException(e1);
try {
destroy(p, DestroyMode.NORMAL);
} catch (final Exception e) {
swallowException(e);
}
try {
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
updateStatsReturn(activeTime);
return;
}
//......
//返回空闲队列
}
BaseGenericObjectPool
或者实现基础接口PoolObjectFactory
,并按照业务需求重写对象的创建、销毁、校验、激活、钝化方法,其中销毁多为连接的关闭,置空等。GenericObjectPool
或者实现基础接口ObjectPool
,建议使用前者,其提供了空闲对象驱逐检测机制(即将空闲队列中长时间未使用的对象销毁,降低内存占用等),以及提供了很多对象的基本信息,例如对象最后被使用的时间、使用对象前是否检验等。GenericObjectPoolConfige
或者继承BaseObjectPoolConfig
,来增加对线程池的配置控制,建议使用前者,其为我们实现了基本方法,只需要自己添加需要的属性即可。需要为对象池设置空闲队列最大最小值,默认最大最小值,默认最大为8往往不能满足需求。
private volatile int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private volatile int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
public static final int DEFAULT_MAX_IDLE = 8;
public static final int DEFAULT_MIN_IDLE = 0;
对象池设置maxWaitMillis
属性,即取用对象最大等待时间。
使用完对象及时释放对象,将对象返回池中,特别是发生了异常也要通过try...catch...finally
的方式确保释放,避免占用资源。
为什么设置maxWaitMillis
?取用对象的方法如下:
public T borrowObject() throws Exception {
return borrowObject(getMaxWaitMillis());
}
可以看到默认的最大等待时间为-1L
:
private volatile long maxWaitMillis =
BaseObjectPoolConfig.DEFAULT_MAX_WAIT_MILLIS;
//....
public static final long DEFAULT_MAX_WAIT_MILLIS = -1L;
查看取用对象逻辑,blockWhenExhausted
默认为true
意思是当池中不存在空闲对象时,又来取用对象,线程将会阻塞直到有新的可用对象。由上可以得知-1L
会执行idleObjects.takeFirst()
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
//.......
final boolean blockWhenExhausted = getBlockWhenExhausted();
boolean create;
final long waitTime = System.currentTimeMillis();
while (p == null) {
//.......
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0) {
p = idleObjects.takeFirst();
} else {
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
}
}
}
如下,阻塞队列将会一直阻塞,直到有了空闲对象才停止阻塞,这样的设定将会在吞吐提高时造成大面积阻塞影响。
public E takeFirst() throws InterruptedException {
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null) {
notEmpty.await();
}
return x;
} finally {
lock.unlock();
}
}
其他注意事项时回收资源,即调用public void returnObject(final T obj)
方法,对象池对于是否使用完对象是无感知的,需要低啊用该方法回收对象,特别是异常发生的时候。
try{
item = pool.borrowObject();
} catch(Exception e) {
log.error("....");
} finally {
pool.returnObject(item);
}