作为一名后台码农,常常要使用到连接池,这里以nanomsg为例,实现一个简单的连接池。
package nanomsg
import (
"sync"
"git.cdsdjx.net/sdjx/gdserver/common/setting"
"git.cdsdjx.net/sdjx/gdserver/logs"
"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/sub"
)
// 连接的接口
type ConnRes interface {
Close() error
Recv() ([]byte, error)
}
type Factory func() ConnRes
// 连接池
type Pool struct {
conns chan ConnRes
factory Factory
maxConn int
}
var oncePool sync.Once
var pool *Pool
func NewPool(factory Factory, maxConn int) *Pool {
return &Pool{
conns: make(chan ConnRes, maxConn),
factory: factory,
maxConn: maxConn,
}
}
func (p *Pool) new() ConnRes {
for i := 0; i < p.maxConn; i++ {
p.conns <- p.factory()
}
return p.factory()
}
func (p *Pool) Get() (conn ConnRes) {
select {
case conn = <-p.conns:
p.Put(conn)
default:
conn = p.new()
}
return
}
func (p *Pool) Put(conn ConnRes) {
select {
case p.conns <- conn:
{
}
default:
conn.Close()
}
}
func GetSub() (conn ConnRes) {
oncePool.Do(func() {
setting.Setting()
pool = NewPool(func() ConnRes {
var err error
if SockSub, err = sub.NewSocket(); err != nil {
logs.Error("创建socket失败: %s", err.Error())
return nil
}
if err = SockSub.Dial(setting.NanomsgPubsubUrl); err != nil {
logs.Error("连接socket失败: %s", err.Error())
return nil
}
err = SockSub.SetOption(mangos.OptionSubscribe, []byte(""))
if err != nil {
logs.Error("订阅失败: %s", err.Error())
return nil
}
return SockSub
}, setting.NanomsgMixConn)
})
return pool.Get()
}