初始化一个 Goroutine Pool 池,这个Pool维护了一个类似栈的FILO队列 ,里面存放负责处理任务的Worker,然后每有一个请求就开启一个协程。
之后
参考资料:《Go语言实战》7.2章 Pool
package main
import (
"io"
"log"
"sync"
"errors"
)
// Pool管理一组可以安全在多个goroutine间共享的资源
// 被管理的资源必须实现io.Closer接口
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
// ErrPoolClosed表示请求了一个已经关闭的池
var ErrPoolClosed = errors.New("Pool has been closed.")
//New创建一个用来管理资源的池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("Size value too small.")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
//Acquire从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
// 检查是否有空闲的资源
case r, ok := <-p.resources:
log.Println("Acquire:", "Shared Resource")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
//如果没有空闲资源可用,提供一个新资源
default:
log.Println("Acquire:", "New Resource")
return p.factory()
}
}
//Release将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
//加锁,保证本操作和Close操作的安全
p.m.Lock()
defer p.m.Unlock()
//如果池已经关闭,销毁这个资源
if p.closed {
r.Close()
return
}
select {
//将这个资源放入队列
case p.resources <- r:
log.Println("Release:", "In Queue")
// 如果队列已满,则关闭这个资源
default:
log.Println("Release:", "Closing")
r.Close()
}
}
// Close会让资源池停止工作,并关闭所有现在的资源
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
//将池关闭
p.closed = true
//清空通道里的资源之前,将通道关闭,防止死锁
close(p.resources)
//关闭资源
for r := range p.resources {
r.Close()
}
}