Jigsaw 中线程池的设计对我的触发很大,因为之前没有涉及到这方面的项目,多线程以及线程池之类感念还比较神秘,通过对 jigsaw 的学习,这层神秘面纱也被褪去,同时很是感慨大师们的设计之妙。现在提一个问题,如何设计处理 http 请求的程序?如果我没有看过 jigsaw ,我是无从下手的,当然,从以往的学习中,只能从概念上提出线程池的解决方案,但具体怎么做,如何操作,我还是没有什么头绪。
当启动一个 http 服务器后,客户端会有多少请求?什么时候到?怎么样尽快处理每个请求,而不让用户长时间等待?这些问题只有多线程才能解决,那么在服务器启动时,可以启动指定数量的线程放在一个池中,每个线程都有不同的状态,当有请求过来时,便从池中取得一个空闲线程来处理这个请求,并改变这个线程的状态,处理完后,再把线程放回池中,当有其他请求过来时,再从池中取得线程,这样可以循环利用所有线程,实现资源的最大利用率。下面就来仔细看看 jigsaw 的多线程设计。
对于每个 http 请求,可以抽象为一个客户对象,用来封装 http 请求,在客户工厂类中就需要定义放置这些客户对象以及处理这些客户对象的线程的容器, jigsaw 使用了链表来保存这些客户对象,分别构成了客户池和线程池。一个新请求到达,就从客户池请求一个客户对象封装这个请求,并从线程池请求一个空闲线程,把这个客户对象交给这个线程处理,而不会影响其他的请求。
如何设计这个客户池呢? jigsaw 提供了一个抽象类 LRUList ,它包含两个属性 :
protected LRUNode head ;// 记录这个链表的表头节点
protected LRUNode tail ;// 记录这个链表的表尾节点
LRUNode 类就是客户池链表的节点类型,它实现了 LRUAble 接口,并包含它前一个节点和后一个节点的引用,这是 LRUNode 类的属性 :
protected LRUAble prev ;// 前一个节点的引用
protected LRUAble next ;// 后一个节点的引用
在客户工厂类中的客户池实际上是一个继承了 LRUList 的类 SyncLRUList ,因为后面对客户池的操作是在多线程环境下,该类中实现的抽象方法都是同步的。从客户池中获取一个客户对象是调用链表的 removeTail() 方法,下面是这个方法以及相关代码 :
public final synchronized LRUAble removeTail() {
// 如果链表的尾节点的前节点不是链表的头节点,则移除该尾节点的前节点,并将移除的节点返回给调用者
if ( tail . prev != head )
return remove (tail.prev) ;
return null;
}
public final synchronized LRUAble remove(LRUAble node) {
_remove(node) ;
node.setNext((LRUAble) null ) ;
node.setPrev((LRUAble) null ) ;
return node ;
}
private final synchronized void _remove(LRUAble node) {
LRUAble itsPrev, itsNext ;
itsPrev = node.getPrev() ;
// note assumption: if its prev is not null, neither is its next
if (itsPrev== null ) return ;
itsNext = node.getNext() ;
itsPrev.setNext(itsNext) ;
itsNext.setPrev(itsPrev) ;
}
由上面可知,每次从客户池取客户对象,都是取的表尾节点的上一个节点(如果不是表头节点),然后就可以把客户套接字绑定到该客户对象上,处理 http 请求
那么用于处理客户对象的线程池又是如何设计的呢?
Jigsaw 提供了 ThreadCache 类,代表了这个线程池,它也有两个属性 :
protected CachedThread freehead = nul l; // 被缓存的线程,是链表的表头
protected CachedThread freetail = null ; // 被缓存的线程,是链表的表尾
在客户工厂类的初始化过程中,便对线程池的池大小进行了设置,并初始化了这个线程池,启动了指定个数的线程放置在池中(实质上也是一个链表),最初都处于阻塞状态。
CachedThread 是一个继承了 Thread 的类,它的设计思想也和前面 LRUNode 一样,作为一个链表的节点,都需要保存上一个节点和后一个节点的引用,请看代码 :
CachedThread next = null ; // 当前线程在链表中的下一个线程的引用
CachedThread prev = null ; // 当前线程在链表中的上一个线程的引用
另外,它还包含一个属性 :
Runnable runner = null;
这就是需要交给线程处理的客户对象,因为客户对象实现了 Runnable 接口,从客户池中取得的客户对象就可以赋给 runner ,由线程去调用 runner 的 run() 方法处理请求
另外,它包含了其他几个重要的标志
boolean alive = true; // 该线程是否活动
boolean terminated = false; // 该线程处理客户对象是否终止
boolean started = false ; // 该线程是否已启动
boolean firstime = true ; // 该线程是否第一次启动
这是 CachedThread 的 run() 方法,最开始在调用 waitForRunner() 会被阻塞,直到该线程获得一个客户对象
public void run() {
try {
while ( true ) {
// Wait for a runner:
Runnable torun = waitForRunner();
// If runner, run:
if ( torun != null )
torun.run(); //torun 实际上是一个 SocketClient 对象
// If dead, stop
if ( ! alive )
break ;
}
} finally {
cache .isDead( this );
}
}
waitForRunner() 方法首先会判断 runner 属性是否为空,不为空就会把它返回给调用者,返回之前会把 runner 重新置为 null ;其次判断该线程是否为第一次启动,是第一次就将线程阻塞;最后判断当前线程对象是否是活动的,是活动的就阻塞它。
synchronized Runnable waitForRunner() {
boolean to = false ;
while ( alive ) {
// Is a runner available ?
if ( runner != null ) {
Runnable torun = runner ;
firstime = false ;
runner = null ;
return torun;
} else if ( firstime ) {
// This thread will not be declared free until it runs once:
try {
wait();
} catch (InterruptedException ex) {}
} else if ( alive = cache .isFree( this , to) ) {
// Notify the cache that we are free, and continue if allowed:
try {
int idleto = cache .getIdleTimeout();
to = false ;
if ( idleto > 0 ) {
wait(idleto);
to = ( runner == null );
} else {
wait();
}
} catch (InterruptedException ex) {}
}
}
return null ;
}
Wakeup 方法用于唤醒当前阻塞的线程
synchronized boolean wakeup(Runnable runnable) {
if ( alive ) {
runner = runnable;
if ( ! started )
this .start();
notify();
return true ;
} else {
return false ;
}
}