sync - 处理同步需求
golang是一门语言级别支持并发的程序语言。golang中使用go语句来开启一个新的协程。 goroutine是非常轻量的,除了给它分配栈空间,它所占用的内存空间是微乎其微的。
但当多个goroutine同时进行处理的时候,就会遇到比如同时抢占一个资源,某个goroutine等待另一个goroutine处理完某一个步骤之后才能继续的需求。 在golang的官方文档上,作者明确指出,golang并不希望依靠共享内存的方式进行进程的协同操作。而是希望通过管道channel的方式进行。 当然,golang也提供了共享内存,锁,等机制进行协同操作的包。sync包就是为了这个目的而出现的。
锁
sync包中定义了Locker结构来代表锁。
type Locker interface {
Lock()
Unlock()
}
并且创造了两个结构来实现Locker接口:Mutex 和 RWMutex。
Mutex就是互斥锁,互斥锁代表着当数据被加锁了之后,除了加锁的程序,其他程序不能对数据进行读操作和写操作。 这个当然能解决并发程序对资源的操作。但是,效率上是个问题。当加锁后,其他程序要读取操作数据,就只能进行等待了。 这个时候就需要使用读写锁。
读写锁分为读锁和写锁,读数据的时候上读锁,写数据的时候上写锁。有写锁的时候,数据不可读不可写。有读锁的时候,数据可读,不可写。 互斥锁就不举例子,读写锁可以看下面的例子:
package main
import (
"sync"
"time"
)
var m *sync.RWMutex
var val = 0
func main() {
m = new(sync.RWMutex)
go read(1)
go write(2)
go read(3)
time.Sleep(5 * time.Second)
}
func read(i int) {
m.RLock()
time.Sleep(1 * time.Second)
println("val: ", val)
time.Sleep(1 * time.Second)
m.RUnlock()
}
func write(i int) {
m.Lock()
val = 10
time.Sleep(1 * time.Second)
m.Unlock()
}
返回:
val: 0
val: 10
但是如果我们把read中的RLock和RUnlock两个函数给注释了,就返回了:
val: 10
val: 10
这个就是由于读的时候没有加读锁,在准备读取val的时候,val被write函数进行修改了。
临时对象池
当多个goroutine都需要创建同一个对象的时候,如果goroutine过多,可能导致对象的创建数目剧增。 而对象又是占用内存的,进而导致的就是内存回收的GC压力徒增。造成“并发大-占用内存大-GC缓慢-处理并发能力降低-并发更大”这样的恶性循环。 在这个时候,我们非常迫切需要有一个对象池,每个goroutine不再自己单独创建对象,而是从对象池中获取出一个对象(如果池中已经有的话)。 这就是sync.Pool出现的目的了。
sync.Pool的使用非常简单,提供两个方法:Get和Put 和一个初始化回调函数New。
看下面这个例子(取自gomemcache):
// keyBufPool returns []byte buffers for use by PickServer's call to
// crc32.ChecksumIEEE to avoid allocations. (but doesn't avoid the
// copies, which at least are bounded in size and small)
var keyBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 256)
return &b
},
}
func (ss *ServerList) PickServer(key string) (net.Addr, error) {
ss.mu.RLock()
defer ss.mu.RUnlock()
if len(ss.addrs) == 0 {
return nil, ErrNoServers
}
if len(ss.addrs) == 1 {
return ss.addrs[0], nil
}
bufp := keyBufPool.Get().(*[]byte)
n := copy(*bufp, key)
cs := crc32.ChecksumIEEE((*bufp)[:n])
keyBufPool.Put(bufp)
return ss.addrs[cs%uint32(len(ss.addrs))], nil
}
这是实际项目中的一个例子,这里使用keyBufPool的目的是为了让crc32.ChecksumIEEE所使用的[]bytes数组可以重复使用,减少GC的压力。
但是这里可能会有一个问题,我们没有看到Pool的手动回收函数。 那么是不是就意味着,如果我们的并发量不断增加,这个Pool的体积会不断变大,或者一直维持在很大的范围内呢?
答案是不会的,sync.Pool的回收是有的,它是在系统自动GC的时候,触发pool.go中的poolCleanup函数。
func poolCleanup() {
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
这个函数会把Pool中所有goroutine创建的对象都进行销毁。
那这里另外一个问题也凸显出来了,很可能我上一步刚往pool中PUT一个对象之后,下一步GC触发,导致pool的GET函数获取不到PUT进去的对象。 这个时候,GET函数就会调用New函数,临时创建出一个对象,并存放到pool中。
根据以上结论,sync.Pool其实不适合用来做持久保存的对象池(比如连接池)。它更适合用来做临时对象池,目的是为了降低GC的压力。
连接池性能测试
package main
import (
"sync"
"testing"
)
var bytePool = sync.Pool{
New: newPool,
}
func newPool() interface{} {
b := make([]byte, 1024)
return &b
}
func BenchmarkAlloc(b *testing.B) {
for i := 0; i < b.N; i++ {
obj := make([]byte, 1024)
_ = obj
}
}
func BenchmarkPool(b *testing.B) {
for i := 0; i < b.N; i++ {
obj := bytePool.Get().(*[]byte)
_ = obj
bytePool.Put(obj)
}
}
文件目录下执行 go test -bench .
E:\MyGo\sync>go test -bench .
testing: warning: no tests to run
PASS
BenchmarkAlloc-4 50000000 39.3 ns/op
BenchmarkPool-4 50000000 25.4 ns/op
ok _/E_/MyGo/sync 3.345s
通过性能测试可以清楚地看到,使用连接池消耗的CPU时间远远小于每次手动分配内存。
Once
有的时候,我们多个goroutine都要过一个操作,但是这个操作我只希望被执行一次,这个时候Once就上场了。比如下面的例子:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
}()
}
time.Sleep(3e9)
}
只会打出一次"Only once"。
WaitGroup 和 Cond
一个goroutine需要等待一批goroutine执行完毕以后才继续执行,那么这种多线程等待的问题就可以使用WaitGroup了。
package main
import (
"fmt"
"sync"
)
func main() {
wp := new(sync.WaitGroup)
wp.Add(10);
for i := 0; i < 10; i++ {
go func() {
fmt.Println("done ", i)
wp.Done()
}()
}
wp.Wait()
fmt.Println("wait end")
}
还有个sync.Cond是用来控制某个条件下,goroutine进入等待时期,等待信号到来,然后重新启动。比如:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
locker := new(sync.Mutex)
cond := sync.NewCond(locker)
done := false
cond.L.Lock()
go func() {
time.Sleep(2e9)
done = true
cond.Signal()
}()
if (!done) {
cond.Wait()
}
fmt.Println("now done is ", done);
}
这里当主goroutine进入cond.Wait的时候,就会进入等待,当从goroutine发出信号之后,主goroutine才会继续往下面走。
sync.Cond还有一个BroadCast方法,用来通知唤醒所有等待的gouroutine。
package main
import (
"fmt"
"sync"
"time"
)
var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
func test(x int) {
cond.L.Lock() // 获取锁
cond.Wait() // 等待通知 暂时阻塞
fmt.Println(x)
time.Sleep(time.Second * 1)
cond.L.Unlock() // 释放锁,不释放的话将只会有一次输出
}
func main() {
for i := 0; i < 40; i++ {
go test(i)
}
fmt.Println("start all")
cond.Broadcast() // 下发广播给所有等待的goroutine
time.Sleep(time.Second * 60)
}
主gouroutine开启后,可以创建多个从gouroutine,从gouroutine获取锁后,进入cond.Wait状态,当主gouroutine执行完任务后,通过BroadCast广播信号。 处于cond.Wait状态的所有gouroutine收到信号后将全部被唤醒并往下执行。需要注意的是,从gouroutine执行完任务后,需要通过cond.L.Unlock释放锁, 否则其它被唤醒的gouroutine将没法继续执行。 通过查看cond.Wait 的源码就明白为什么需要需要释放锁了
func (c *Cond) Wait() {
c.checker.check()
if raceenabled {
raceDisable()
}
atomic.AddUint32(&c.waiters, 1)
if raceenabled {
raceEnable()
}
c.L.Unlock()
runtime_Syncsemacquire(&c.sema)
c.L.Lock()
}
Cond.Wait会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的Wait将继续往下执行并从新上锁,如果不释放锁, 其它收到信号的gouroutine将阻塞无法继续执行。 由于各个Wait收到信号的时间是不确定的,因此每次的输出顺序也都是随机的。