使用MySQL的链接池可能存在如下问题:在客户端连接池中的一条空闲链接,可能是一条已经被MySQL服务端关闭掉的链接。在获取连接进行重新请求的时候,连接会被置为无效,并重新启用新的连接发起请求。常见的连接池都有这样的操作,好奇不好奇,连接池底层是什么数据结构?
或者说,假设已经封装好了连接的对象,你要如何实现一个连接池?假设要这样进行设计是否可行?请求的时候,从连接池中获取一个连接,请求完成后,更新连接的最近使用时间,将连接放回到连接池。如果获取到一个长时间没有使用的连接,就从连接池中将连接移除,然后直接创建一个新的连接。
我们带着这些问题,来看一下 database/sql
的实现,看看官方的代码究竟是怎么写的,有没有哪些是我们想不到的?先从两个主线下手,①从连接池中获取连接,②使用完后将连接放回到连接池中。
首先,从 DB 的结构体上,我们可以看到两个比较关键的字段。难道连接池是通过切片和 map 的组合实现的?我们直接去看 conn 的方法,这个方法体用来创建新的连接,或者从连接池中获取缓存的连接,下面的内容参考 go1.17
的源码。
type DB struct {
freeConn []*driverConn
connRequests map[uint64]chan connRequest
}
conn
方法实在太长了,截取其中的一段代码来瞅瞅,看来,空闲连接是通过数组来保存的。当要从连接池获取连接的时候,直接读取 freeConn 的第一个元素,然后将切片的所有元素向前移动一个位置,之后再将切片的长度减1。但这里有一个疑问,虽然我们修改 freeConn 的长度,但内存中 numFree-1
位置的对象依旧是存在的。在一些其它的源代码中,会着重将这个位置的对象设置为 nil,然后再缩减切片的长度。
取出来的连接,还要判断连接是否已经过期了,过期时间的判断应该和连接最后一次使用时间做比较,如果距离这个链接最后一次使用时间已经超过了设置的最大生命周期,则会抛出一个 ErrBadConn。上层逻辑该如何处理这个错误呢?如果当前连接池的个数没有达到上限,只需要重新创建一个新的连接。如果达到了连接的上限,就需要阻塞等待了。
// conn returns a newly-opened or cached *driverConn.
{
// 省略
// Prefer a free connection, if possible.
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
}
}
创建新的连接没有什么特别的,重点关注一下连接池达到上限,阻塞等待的情况。我们该如何实现这个阻塞逻辑呢?用最基本的思路去猜一下,哪怕猜错了也没啥问题。代码已经在那里了,猜与不猜起不了任何作用。我需要有一个判断标准,当前是否有空闲的连接了。可以主动去检测当前有没有空闲的连接,也可以阻塞在一个信号上,当有空闲连接的时候唤醒我。
另外,阻塞连接的顺序需要保证,如果阻塞在信号量上,go协程的顺序其实是有保障的。另外,唤醒之后,在尚未真正发起请求之前,是不是请求已经超时了,也需要做一次时间校验。基于性能的考虑,肯定是需要阻塞在一个信号上的,我们看一下源码。
我们提到的信号量就是源码中的 req,新的请求会阻塞在这个 req 上,通过 select 来监听当前是否有可用的连接。req 是 chan 类型,缓冲区为1。写入的一端是当有连接被释放的时候,当 req 中读取到空闲的连接,依旧需要判断连接的有效性,在使用之前,还需要清空上次请求的信息。这有点类似 sync.Pool 的使用。
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := nowFunc()
// Timeout the connection request with the context.
select {
case ret, ok := <-req:
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
if !ok {
return nil, errDBClosed
}
// Only check if the connection is expired if the strategy is cachedOrNewConns.
// If we require a new connection, just re-use the connection without looking
// at the expiry time. If it is expired, it will be checked when it is placed
// back into the connection pool.
// This prioritizes giving a valid connection to a client over the exact connection
// lifetime, which could expire exactly after this point anyway.
if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
db.mu.Lock()
db.maxLifetimeClosed++
db.mu.Unlock()
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
// Reset the session if required.
if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
直观看的话,其实并不复杂,因为我们只是想搞清楚它的实现原理,但具体到实操上,要考虑的细节就会特别多。比如,使用缓冲区为1的chan、使用 mutex 写锁来限制并发等。
只看到这个方法名字,我以为方法内部是加锁的逻辑,但方法内部非常简单,应该是想表示调用端需要加锁。因为 connRequests 使用这个方法返回值作为 key,所以,这个 key 唯一标志了一个请求链接,我特别想看看这个 key 是怎么处理的。
// nextRequestKeyLocked returns the next connection request key.
// It is assumed that nextRequest will not overflow.
func (db *DB) nextRequestKeyLocked() uint64 {
next := db.nextRequest
db.nextRequest++
return next
}
但实际代码确实比较简单,成员变量 db.nextRequest 简单自增。最主要的是,db.nextRequest 会一直自增,而没有考虑整数溢出。为什么官方不来个整数循环呢,比如自增到了 100w 的时候开始从 0 计数(go atomic 原子操作中就展示了循环计数)。
不过,就算这个整数溢出了也没啥的问题,而且,理论上等待连接的请求书数不会特别多,大多数返回的都特别快,也就不会调用到这个方法。
我们观察一下 connRequests 这个 map 结构的删除操作。前面介绍了 map 的写入:给等待的请求生成一个自增ID,存储到 connRequests 里面,等别的正在使用的连接释放之后,就可以被用来处理这些等待的请求。
如何从 map 结构体中取出一个等待的请求呢?map 内部存储的数据是无序的,下面代码的实现是直接 for 循环,取到一个数据之后,break 跳出循环。在这个场景下,从 map 里任意取出一个元素,好像没有啥别的好的方法。
// Satisfy a connRequest or put the driverConn in the idle pool and return true
// or return false.
// putConnDBLocked will satisfy a connRequest if there is one, or it will
// return the *driverConn to the freeConn list if err == nil and the idle
// connection limit will not be exceeded.
// If err != nil, the value of dc is ignored.
// If err == nil, then dc must not equal nil.
// If a connRequest was fulfilled or the *driverConn was placed in the
// freeConn list, then true is returned, otherwise false is returned.
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}
另外,我们着重关注一下这种处理模式,很多代码库都有使用这种模式:
putConnDBLocked
函数从全局的 db.connRequests
读取到一个 channel 对象之后,首先从全局对象中将 channel 清空,然后向这个 channel 中写入连接数据。而等待连接的另一端正在监听这个 channel,当有数据写入之后,它就可以继续执行。
回到文章开篇,sql 连接池底层结构就是一个切片类型,模拟一个队列结构,实现两个操作:从池子中读取一个连接,向池子中放回一个连接。但实现上对这个过程做了优化,当存在等待连接的请求时,可以直接复用已经释放的连接,省去放回池子的过程。
看了网上的讨论,可以查看下面的链接:mysql issues。然后查看官方的介绍,现在已经有了现成的解决方法。官方地址:Connection pool and timeouts
下面是beego中的设置MySQL连接池的方法:
err = orm.RegisterDataBase("default", "mysql", iniConfig.String("mysql"))
if err != nil {
logs.Error("db register data error:%v", err)
}
mdb, err := orm.GetDB("default")
if err != nil {
panic(fmt.Errorf("get db error:%s", err))
}
mdb.SetConnMaxLifetime(time.Second * 20)
mdb.SetMaxIdleConns(10)
mdb.SetMaxOpenConns(30)
如上面的代码,主要用来修改连接池中每个链接的最长生命时间、最大空闲链接数以及最大可以打开的链接。
因为在orm中并没有暴露SetConnMaxLifetime的方法,所以需要获取*DB对象来处理
mdb, err := orm.GetDB("default")