package main
import (
"time"
"github.com/go-redis/redis"
)
func main(){
conf := &RedisConfig{
Addr: "codis.devops-k8s.sg2.i.sz.shopee.io:31923",
DB: 0,
PoolSize: 10, // 连接池大小
IdleTimeout: 30, // 客户端关闭空闲连接的时间
DialTimeout: 1,
ReadTimeout: 1,
WriteTimeout: 1,
}
InitCache(conf)
// 操作缓存
Cache.client.Set("key","value", 1*time.Second)
}
var Cache *RedisCache
type RedisConfig struct {
Addr string `yaml:"addr"`
DB int `yaml:"db"`
PoolSize int `yaml:"pool_size"`
IdleTimeout int `yam:"idle_timeout"`
DialTimeout int `yam:"dial_timeout"`
ReadTimeout int `yam:"read_timeout"`
WriteTimeout int `yam:"write_timeout"`
}
type RedisCache struct {
client *redis.Client
}
func InitCache(conf *RedisConfig) {
Cache = &RedisCache{}
Cache.client = redis.NewClient(&redis.Options{
Addr: conf.Addr,
DB: conf.DB,
PoolSize: conf.PoolSize,
IdleTimeout: time.Duration(conf.IdleTimeout) * time.Second,
DialTimeout: time.Duration(conf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(conf.WriteTimeout) * time.Second,
})
}
type baseClient struct {
opt *Options
connPool pool.Pooler
limiter Limiter
process func(Cmder) error
processPipeline func([]Cmder) error
processTxPipeline func([]Cmder) error
onClose func() error // hook called when client is closed
}
type Cmder interface {
Name() string
Args() []interface{}
stringArg(int) string
readReply(rd *proto.Reader) error
setErr(error)
readTimeout() *time.Duration
Err() error
}
type cmdable struct {
process func(cmd Cmder) error
}
type Client struct {
baseClient
cmdable
ctx context.Context
}
func NewClient(opt *Options) *Client {
opt.init()
c := Client{
baseClient: baseClient{
opt: opt,
connPool: newConnPool(opt), // 生成管理连接实例
},
}
c.baseClient.init() // 客户端初始化,比较重要的是设置了 process:c.process = c.defaultProcess
c.init() // 就是把 Client的 process 设置为 c.defaultProcess
return &c
}
从上面代码我们看出来 Client 其实就是封装了 baseClient 和 cmdable。其中
但我们执行一个Redis 命令时发生了什么呢?以 Set 方法为例子
func (c *cmdable) Set(key string, value interface{}, expiration time.Duration) *StatusCmd {
args := make([]interface{}, 3, 4)
args[0] = "set"
args[1] = key
args[2] = value
if expiration > 0 {
if usePrecise(expiration) {
args = append(args, "px", formatMs(expiration))
} else {
args = append(args, "ex", formatSec(expiration))
}
}
cmd := NewStatusCmd(args...)
c.process(cmd)
return cmd
}
我们在执行命令的时候会调用到 c.process(cmd) 这个函数,在之前我们提到了 c.process = c.defaultProcess,defaultProcess 接收一个 Cmder 接口。
那我们再看一下 defaultProcess 做了一些什么?
func (c *baseClient) defaultProcess(cmd Cmder) error {
// 重事次数
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
// 获取到连接
cn, err := c.getConn()
if err != nil {
cmd.setErr(err)
if internal.IsRetryableError(err, true) {
continue
}
return err
}
// 往网络连接中写入数据
err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd)
})
if err != nil {
// 释放连接
c.releaseConn(cn, err)
cmd.setErr(err)
if internal.IsRetryableError(err, true) {
continue
}
return err
}
// 读取数据
err = cn.WithReader(c.cmdTimeout(cmd), cmd.readReply)
c.releaseConn(cn, err)
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
continue
}
return err
}
return cmd.Err()
}
总体的流程就是:获取到连接 → 往网络连接中写入数据 → 读取数据→ 释放连接→ 返回结果。其中比较重要的是 c.getConn(),这里会从链接池里面获取链接。 下面我们就看下 Go-Redis 的链接池是怎么实现的
Go-Redis 的链接池链接池有三种:
这里我们重点说明 ConnPool,因为 StickyConnPool 和 SingleConnPool 是根据 ConnPool 封装出来的。
一个典型的链接池设计都会包含以下几个功能:连接建立、管理连接、连接释放
go 的链接池都实现了 Pooler 这个接口,实现了 连接建立,连接获取,管理连接、连接状态的统计
type Pooler interface {
NewConn() (*Conn, error)
CloseConn(*Conn) error
Get() (*Conn, error)
Put(*Conn)
Remove(*Conn, error)
Len() int
IdleLen() int
Stats() *Stats
Close() error
}
type Options struct {
Dialer func() (net.Conn, error) // 如何建立连接函数
OnClose func(*Conn) error // 关闭连接时的回调函数
PoolSize int // 总连接数上限,默认值为 CPU 数量的 10 倍(非计算密集型应用)
MinIdleConns int // 最少空闲连接数
MaxConnAge time.Duration // 连接最大存活时间,默认 0
PoolTimeout time.Duration // 无可用连接的等待超时时间,默认 4s
IdleTimeout time.Duration // 连接空闲最大时长,默认 5min
IdleCheckFrequency time.Duration // 连接空闲检测周期,默认 1min
}
type ConnPool struct {
opt *Options
dialErrorsNum uint32 // atomic // 链接的错误数量
lastDialErrorMu sync.RWMutex // 实现安全操作 lastDialErrorMu
lastDialError error // 最后一个连接的错误
queue chan struct{} // 链接池令牌容量大小。
connsMu sync.Mutex // 实现 Get,Put 操作线程安全 conns []*Conn
idleConns []*Conn // 闲置连接队列,面向 Get,Put,reaper 操作
poolSize int // 链接池的长度
idleConnsLen int // 空闲连接的长度
stats Stats // 连接池统计信息
_closed uint32 // atomic 连接池关闭标记
}
初始化连接池主要是做了初始化一部分参数和新建一些空闲连接,起一个后台任务释放空闲连接
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
opt: opt,
queue: make(chan struct{}, opt.PoolSize), // 链接池的队列大小,ConnPool 用Channel来限制链接池的大小
conns: make([]*Conn, 0, opt.PoolSize), // 连接
idleConns: make([]*Conn, 0, opt.PoolSize), // 空闲连接
}
for i := 0; i < opt.MinIdleConns; i++ {
p.checkMinIdleConns() // 新建一些空闲连接,空闲连接的大小是不会记录在 queue 的长度里面的,所以,如果有设置最小空闲的连接,则会一开始就建立好连接
}
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency) //释放空闲连接
}
return p
}
func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
// 如果建立连接的错误数量大于链接池的数量,则直接返回错误,开始熔断,触发可用性探测
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
return nil, p.getLastDialError()
}
// 建立连接
netConn, err := p.opt.Dialer()
if err != nil {
p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
// 尝试建立连接,建立连接则会把 dialErrorsNum 赋值为0
go p.tryDial()
}
return nil, err
}
// 包装为自己的conn
cn := NewConn(netConn)
cn.pooled = pooled
return cn, nil
}
先从idleConns队列取 idle 连接,若实为 stale 连接则回收,若无可用的 idle 连接则穿透新建连接
func (p *ConnPool) Get() (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
// 获取queue令牌:往 p.queue 中写入一条数据,如果能够写入,则说明有空闲的连接
// 如果不能,则会等待 p.opt.PoolTimeout 时间,超过这个时间,则会返回 ErrPoolTimeout = errors.New("redis: connection pool timeout")
err := p.waitTurn()
if err != nil {
return nil, err
}
// 从空闲的连接中拿到一条连接,则直接返回,否则会使用 _NewConn 新建一条连接
for {
p.connsMu.Lock()
cn := p.popIdle()
p.connsMu.Unlock()
if cn == nil {
break
}
if p.isStaleConn(cn) {
_ = p.CloseConn(cn)
continue
}
atomic.AddUint32(&p.stats.Hits, 1)
return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
// true 表示这条连接会被放回连接池中,当连接池的大小>p.opt.PoolSize时,pooled 会被修改为 false,表示不会返回到连接池中
newcn, err := p._NewConn(true)
if err != nil {
p.freeTurn()
return nil, err
}
return newcn, nil
}
Put:完成命令请求并读取响应后,如果时需要放回连接池的,则将连接放回idleConns 队列,而且释放一个令牌
func (p *ConnPool) Put(cn *Conn) {
if !cn.pooled {
p.Remove(cn, nil)
return
}
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
p.connsMu.Unlock()
p.freeTurn()
}
Remove:请求超时后将连接从conns队列中移出(此连接先前已从idleConns出队)
func (p *ConnPool) Remove(cn *Conn, reason error) {
// 从连接池中移除连接
p.removeConn(cn)
p.freeTurn()
// 关闭连接
_ = p.closeConn(cn)
}
checkMinIdleConns 函数用来检测现有链接是否有配置的最小链接,如果没有,则会新建一个链接,放入池中
func (p *ConnPool) checkMinIdleConns() {
if p.opt.MinIdleConns == 0 {
return
}
if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
// 链接池个数加一
p.poolSize++
p.idleConnsLen++
// 新建立链接,addIdleConn 这个err 没有处理
//
go p.addIdleConn()
}
}
// 新建立链接
func (p *ConnPool) addIdleConn() {
cn, err := p.newConn(true)
// 新建链接错误返回
if err != nil {
return
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
p.idleConns = append(p.idleConns, cn)
p.connsMu.Unlock()
}
func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
cn, err := p.newConn(pooled)
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
if pooled {
if p.poolSize < p.opt.PoolSize {
p.poolSize++
} else {
cn.pooled = false
}
}
p.connsMu.Unlock()
return cn, nil
}
bug
**已经在:https://github.com/go-redis/redis/pull/1105 修复了。使用最新版本的 go-redis 没有这个问题// 连接池连接数量总数
func (p *ConnPool) Len() int {
p.connsMu.Lock()
n := len(p.conns)
p.connsMu.Unlock()
return n
}
// 连接池空闲连接数量
func (p *ConnPool) IdleLen() int {
p.connsMu.Lock()
n := p.idleConnsLen
p.connsMu.Unlock()
return n
}
func (p *ConnPool) Stats() *Stats {
idleLen := p.IdleLen()
return &Stats{
//Hits:连接池命中空闲连接次数
Hits: atomic.LoadUint32(&p.stats.Hits),
//Misses:连接池没有空闲连接可用次数
Misses: atomic.LoadUint32(&p.stats.Misses),
//Timeouts:请求连接等待超时次数
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
//TotalConns:连接池总连接数量
TotalConns: uint32(p.Len()),
//IdleConns:连接池空闲连接数量
IdleConns: uint32(idleLen),
//StaleConns:移除过期连接数量
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
}
通常,连接错误记录是读多写少的,所以采用读写锁来保证该记录的并发安全(读写锁在该场景下性能更佳)。
func (p *ConnPool) getLastDialError() error {
// 加锁
p.lastDialErrorMu.RLock()
err := p.lastDialError
p.lastDialErrorMu.RUnlock()
return err
}
func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
// allowTimeout = false 是否检测是否是网络超时的错误,不检查的话,超时错误的连接会被释放
if internal.IsBadConn(err, false) {
// 释放连接
c.connPool.Remove(cn, err)
} else {
// 放回到连接池中
c.connPool.Put(cn)
}
}
func IsBadConn(err error, allowTimeout bool) bool {
if err == nil {
return false
}
// 是 redis err
if IsRedisError(err) {
// #790
// 是 ReadOnly 的错误也会被释放连接
return IsReadOnlyError(err)
}
// 是否检测是否是网络超时的错误
if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return false
}
}
return true
}
func IsRedisError(err error) bool {
_, ok := err.(proto.RedisError)
return ok
}
func IsReadOnlyError(err error) bool {
return strings.HasPrefix(err.Error(), "READONLY ")
}