接触过 Golang 并发编程的同学都知道,Golang 并发编程中的核心思想与其他语言,例如C、C++、Java等,不太一样。Golang 鼓励使用 channel 以 communication
的方式进行线程间交互。但是为了满足更多需求,gosdk 还是添加了 sync
包用来支持传统的以共享内存的方式进行线程交互。sync
包可以比作于 Java 中的 java.util.concurrent
包,里面包含了包括锁、池、线程屏障、原子类型等多种我们在并发编程中常用到的组件之外,还对一些原始的、不支持并发安全的数据结构添加了并发安全的支持。接下来,我们就讨论一下这些组件的使用方法(这部分文档只讨论 sync
包提供的API,不讨论内部原理,底部原理会在以后的文档中继续推出):
sync.WaitGroup
是我们最常用的并发组件之一。Go源码中是这么诠释 WaitGroup 的:
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
大致意思就是,一个 WaitGroup 会等待一组 goroutine 的结束。main goroutine 使用 WaitGroup 的 Add
方法为增加要等待完成的 goroutine 的数量,之后每一个 goroutine 在使用 Done
方法通知已结束。同时,Wait
方法可以将当前的 goroutine 阻塞到所有 goroutine 结束为止。
WaitGroup使用举例:
package main
import (
"fmt"
"sync"
)
func main() {
// 创建WaitGroup变量,可以直接使用,不需要进行其他初始化操作
var wg sync.WaitGroup
// 增加goroutine数量
wg.Add(10)
for i := 0; i < 10; i++ {
// 循环开启goroutine
go func(i int) {
// defer Done操作(通知goroutine结束)
defer wg.Done()
fmt.Printf("I am goroutine %d\n", i)
}(i)
}
// 阻塞到所有goroutine结束为止
wg.Wait()
}
➜ go-sync-package go run waitgroup.go
I am goroutine 8
I am goroutine 2
I am goroutine 5
I am goroutine 7
I am goroutine 3
I am goroutine 4
I am goroutine 6
I am goroutine 0
I am goroutine 9
I am goroutine 1
顾名思义,Once 就是执行且只执行一次。在并发编程中,可能会有一些只需要执行一次的操作,我们需要保护这种操作不能被多个线程多次进入。Once 的出现就解决了这种问题,一个 Once 对应一个函数,Once 会保证传入的这个函数只会被执行一次。Once 只提供了一个API:Do,源码中是这么解释 Do 操作的:
// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
这句话的大致意思就是,一个 Once 实例只能对应一个函数,Do 也只能被调用一次,如果你想定义多个函数只被执行一次,就需要实例化多个 Once。
Once 使用举例:
package main
import (
"fmt"
"sync"
)
// 将Once直接作为全局变量创建
var once sync.Once
func main() {
// 利用WaitGroup阻塞main到所有goroutine结束
var wg sync.WaitGroup
// WaitGroup增加线程数量
wg.Add(10)
// 循环启动goroutine
for i := 0; i < 10; i++ {
go Init(&wg)
}
// 阻塞
wg.Wait()
}
func Init(wg *sync.WaitGroup) {
// defer 通知完成
defer wg.Done()
// 调用 Do 函数
once.Do(func() {
fmt.Println("initiated!!!")
})
}
在这我们定义了一函数 Init
假设其是我们的初始化函数,其利用 once.Do
执行初始化,这里的 once
是在全局中定义的。在启动10 goroutine 同时执行 Init
,再利用之前学到的 WaitGroup
的方式等待这10个 goroutine的结束。输出结果展示,就算我启动了10个 goroutine 去调用 Init
方法,到头来 once.Do
中的函数只被执行了一次:
➜ go-sync-package go run once.go
initiated!!!
sync/mutex.go
中定义了 Locker
接口,该接口包含了两个方法 Lock
和 Unlock
。后续的 Mutex
、RWMutex
等都实现了该接口:
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
互斥锁,实现了呢 Locker
接口,意味着其具有 Lock
和 Unlock
方法。使用该两个方法可以对代码块加锁。Lock
必须被一个 Unlock
跟随。大多情况下如果需要锁一整个代码块,使用 defer
去 Unlock
会是不错的选择。
下面再次使用多个 goroutine 对同一个变量进行加一操作的例子进行展示(对加一操作利用 Mutex
进行加锁):
package main
import (
"fmt"
"sync"
)
func main() {
// 创建Mutex和WaitGroup
var mu sync.Mutex
var wg sync.WaitGroup
// 变量
count := 0
// 增加线程数量为1000
wg.Add(1000)
// 循环启动1000个goroutine
for i := 0; i < 1000; i++ {
go func() {
// 加锁
mu.Lock()
// defer 通知结束
defer wg.Done()
// defer 释放锁
defer mu.Unlock()
count++
}()
}
// 阻塞
wg.Wait()
fmt.Printf("count = %d\n", count)
}
启动1000个 goroutine 去执行一个被同一个 mutex 进行加锁的对 count 进行加一操作的函数,最后输出:
➜ go-sync-package go run mutex.go
count = 1000
RWMutex 是 sync 包中实现了 Locker
接口的另一个结构体。它跟 Mutex
的区别在于 RWMutex
即可以实现互斥锁也可以实现共享锁,也就是我们一般提到的 “写锁” 和 “读锁”,RWMutex
全称也应该是 Read Write Mutex。RWMutex
中的 Lock
和 Unlock
方法实现了 写加锁 和 写释放锁,同时 RWMutex
还有两个区别于 Mutex
的方法 RLock
和 RUnlock
,该两个方法实现了 读加锁 和 读释放锁。RWMutex
实现了传统意义上的:写写互斥、读写互斥、读读共享的模式。使用方法几乎跟 Mutex
相似。
RWMutex
还有一个很特别的方法叫做 RLocker
,其返回一个实现了 locker
接口的对象。rwmutex.go
下定义了一个新的类型 rLocker
:
type rlocker RWMutex
它又重写了 RWMutex
的 Lock
和 Unlock
方法。其内部操作就是将 rLocker
类型的变量转化成 RWMutex
并调用其 RLock
和 RUnlock
方法。
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
return (*rlocker)(rw)
}
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
你会感觉这个方法有点多余,为什么不直接自己调用 RLock
和 RUnlock
方法呢,还要获取新的类型的变量干嘛?!这里就能体现出 go 对接口和结构体间的联系的设计思想。我们知道 RWMutex
是一个 Locker
,一个 Locker
就应该只包含 Lock
和 Unlock
的方法,但是 RWMutex
的加锁和释放锁各有两种方式,所以开发者将 RWMutex
的 Lock
和 Unlock
方法赋给了写锁,为读锁又重新了编写了一个类型,该类型就可以直接使用 Lock
和 Unlock
方法去操作 RWMutex
的读锁。
sync.Cond
就像是组合了一个泛型的 channel
和一个 Mutex
。你从 channel
中获取到一个消息,但你并不知道这个消息具体是什么,这时你就锁上 Mutex
去探索到底发生了什么。假设你有一个 goroutine 需要等待100用户登录,然后再为前10个人发放奖金。最愚蠢的方法会是:
for {
mu.Lock()
if len(users) >= 100 {
givePrizes(users[:10])
mu.Unlock()
return
}
mu.Unlock()
}
这种方法不断地加锁、释放锁占用大量的CPU时间,同时这种方法检查条件的次数远大于条件变更的速度,这就导致这种方法非常低效且耗费资源。
如果我们能让 goroutine 一直 sleep 到 len(users) 变化为止,然后再去检查条件那该多好!我们可以使用 channel 实现这种逻辑:
mu.Lock()
users = append(users, newUser)
mu.Unlock()
select {
// 假设登录操作变化users以后,会对ch进行写入操作
// 这里就会一直堵塞到users变化为止
case ch <- struct{}{}:
default:
}
这种方法显著地减少了我们加锁和释放锁的次数,如果用户只有登录操作(也就是说users只会增大),那么我们加锁和释放锁的次数就刚好为100次。这种方法虽然有效,但是需要一些多余的操作:对channel的读写。
sync.Cond
打包了这种操作,如果使用 sync.Cond
去执行该操作:
cond.L.Lock()
for len(users) < 100 {
cond.Wait()
}
givePrizes(users[:10])
cond.L.Unlock()
// 执行登录操作的goroutine:
cond.L.Lock()
users = append(users, newUser)
cond.L.Unlock()
cond.Signal() // 唤醒一个等待当前Cond的goroutine
这是一个效率上的少许提升,但是这也并不是我们最终想要的效果,我们最终想要让 goroutine 一直阻塞到 len(users) == 100 时启动,但是为了实现这种效果,我们需要将条件转移到发送者一方,也就是说执行登录的 goroutine 去维护条件,等条件满足再去唤醒其他 goroutine。这种操作理论上是可以的,但是它有时会违反一些设计模式,“做登录操作的 goroutine 不应该知道关于奖金的条件”。
sync.Cond
除了上述的方法之外还有一个方法叫做 Broadcast
。假设我们有多个 goroutine 都在等待不一样的事件的发生,与其利用 Signal
的方法去一个一个唤醒,我们可以直接利用 Broadcast
唤醒所有 goroutine。
sync.Cond
的用法举例:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var (
loggedInUsers []string // 已登录用户
stop bool // 是否停止
locker sync.Mutex // sync.Cond 所需的 locker
wg sync.WaitGroup // 利用 sync.WaitGroup 等待线程结束
)
// 构造 sync.Cond
cond := sync.NewCond(&locker)
// 增加线程数
wg.Add(3)
// 此 goroutine 负责模拟登录(每过500ms登录一次)并唤醒单个goroutine
// 最后将 stop 设为 true,并唤醒所有 goroutine
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
time.Sleep(500 * time.Millisecond)
cond.L.Lock()
user := fmt.Sprintf("user_%d", i)
loggedInUsers = append(loggedInUsers, user)
fmt.Printf("user %s has logged in\n", user)
cond.L.Unlock()
cond.Signal()
}
cond.L.Lock()
stop = true
cond.L.Unlock()
cond.Broadcast()
}()
// 此 goroutine 首先等待登录用户数达到五,输出结果,之后等待 stop 被设为 true
go func() {
defer wg.Done()
cond.L.Lock()
for len(loggedInUsers) < 5 && !stop {
cond.Wait()
}
fmt.Printf("logged in user count has exceeded 5\n")
cond.L.Unlock()
cond.L.Lock()
for !stop {
cond.Wait()
}
fmt.Printf("goroutine 1 has stopped\n")
cond.L.Unlock()
}()
// 此 goroutine 等待 stop 被设为true
go func() {
defer wg.Done()
cond.L.Lock()
for !stop {
cond.Wait()
}
fmt.Printf("goroutine 2 has stopped\n")
cond.L.Unlock()
}()
wg.Wait()
}
Pool
直译 池
。其可以作为一个可以临时保存并取还对象的 池子
。对于需要很多重复分配、回收内存的情况下 sync.Pool
是一个很好的选择。sync.Pool
可以帮我们将暂时不用的对象缓存起来,待我们下次使用的时候取出。sync.Pool
是并发安全的,对于多个 goroutine
同时创建对象或取出缓存对象是完全ok的。文档中是这么介绍 sync.Pool
的:
// A Pool is a set of temporary objects that may be individually saved and
// retrieved.
//
// Any item stored in the Pool may be removed automatically at any time without
// notification. If the Pool holds the only reference when this happens, the
// item might be deallocated.
//
// A Pool is safe for use by multiple goroutines simultaneously.
//
// Pool's purpose is to cache allocated but unused items for later reuse,
// relieving pressure on the garbage collector. That is, it makes it easy to
// build efficient, thread-safe free lists. However, it is not suitable for all
// free lists.
最典型的使用就是 fmt
包中的 newPrinter()
函数:
在 golang 编程中大家经常使用的 fmt.Print
、fmt.Printf
、fmt.Println
都会被被转到 FPrint
系列函数并传入 os.Stdout
(一点题外内容)。那我们选一个比较典型的 fmt.FPrintf
来看内容:
func Fprintf(w io.Writer, format string, a ...any) (n int, err error) {
p := newPrinter()
p.doPrintf(format, a)
n, err = w.Write(p.buf)
p.free()
return
}
Fprintf
在第一行使用了 newPrinter
函数获取了 pp
类型的对象,我们深入到 newPrinter
当中:
// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
p := ppFree.Get().(*pp)
p.panicking = false
p.erroring = false
p.wrapErrs = false
p.fmt.init(&p.buf)
return p
}
再次注意第一行,其使用了 ppFree
的 Get
函数获取了一个对象并将其转换成了 pp
类型的指针。而这里的 ppFree
就是 sync.Pool
的实例:
var ppFree = sync.Pool{
New: func() any { return new(pp) },
}
fmt
包利用这种方法复用了 pp
(其实是复用了 pp
中的 buffer
),来减少内存的重复分配和GC的压力。
sync.Pool
的设计也是非常完美的,在使用时我们只会接触到三个函数 New
、Put
和 Get
。
sync.Pool
时需要提供 New
函数,其作用是在 Pool 中对象不够时新建对象;Put
函数一般都会在对象被使用后归还时调用。Get
函数就是在获取对象时使用。go 内嵌的 map 类型并不是并发安全的,如果需要并发安全的 map 我们有两种选择:
go 官方文档中是这么介绍 sync.Map
的:
// Map is like a Go map[interface{}]interface{} but is safe for concurrent use
// by multiple goroutines without additional locking or coordination.
// Loads, stores, and deletes run in amortized constant time.
//
// The Map type is specialized. Most code should use a plain Go map instead,
// with separate locking or coordination, for better type safety and to make it
// easier to maintain other invariants along with the map content.
//
// The Map type is optimized for two common use cases: (1) when the entry for a given
// key is only ever written once but read many times, as in caches that only grow,
// or (2) when multiple goroutines read, write, and overwrite entries for disjoint
// sets of keys. In these two cases, use of a Map may significantly reduce lock
// contention compared to a Go map paired with a separate Mutex or RWMutex.
sync.Map
的使用不需要进行额外的加锁就能保证并发安全。但是多数情况下 go 官方还是推荐使用 buildtin map 类型。sync.Map
适合在以下两种情况下使用:
以上两种情况下,sync.Map
的使用效率可能会相比于 builtin map 加 Mutex 明显地提高很多。
这是因为在 sync.Map
的读效率非常高的同时,其写入效率会非常低,大多情况下会低于 builtin map + Mutex 的方式,Map测试文档引用:The power of Go concurrent reading and writing sync.map
sync.Map
中公开的 API 比较多:
gosdk 中的 sync
包大多只是为了一些特殊情况而设计的,在平时的编程中还是鼓励大家使用 golang 推崇的 CSP
编程模式,使用 channel
+ goroutine
的方式设计并发模型。sync
包中还有一部分很重要的内容即 atomic
包,其内部包含很多 CAS
相关内容,有机会将在以后的文章中继续与大家分享。