当前位置: 首页 > 工具软件 > Golang-Pool > 使用案例 >

Golang - sync包的使用 (WaitGroup, Once, Mutex, RWMutex, Cond, Pool, Map)

濮赤岩
2023-12-01

Golang — sync包的使用

接触过 Golang 并发编程的同学都知道,Golang 并发编程中的核心思想与其他语言,例如C、C++、Java等,不太一样。Golang 鼓励使用 channel 以 communication 的方式进行线程间交互。但是为了满足更多需求,gosdk 还是添加了 sync 包用来支持传统的以共享内存的方式进行线程交互。sync 包可以比作于 Java 中的 java.util.concurrent 包,里面包含了包括锁、池、线程屏障、原子类型等多种我们在并发编程中常用到的组件之外,还对一些原始的、不支持并发安全的数据结构添加了并发安全的支持。接下来,我们就讨论一下这些组件的使用方法(这部分文档只讨论 sync 包提供的API,不讨论内部原理,底部原理会在以后的文档中继续推出):

WaitGroup

sync.WaitGroup 是我们最常用的并发组件之一。Go源码中是这么诠释 WaitGroup 的:

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.

大致意思就是,一个 WaitGroup 会等待一组 goroutine 的结束。main goroutine 使用 WaitGroup 的 Add 方法为增加要等待完成的 goroutine 的数量,之后每一个 goroutine 在使用 Done 方法通知已结束。同时,Wait 方法可以将当前的 goroutine 阻塞到所有 goroutine 结束为止。

WaitGroup使用举例:

package main

import (
	"fmt"
	"sync"
)

func main() {
  // 创建WaitGroup变量,可以直接使用,不需要进行其他初始化操作
	var wg sync.WaitGroup
  // 增加goroutine数量
	wg.Add(10)
	for i := 0; i < 10; i++ {
    // 循环开启goroutine
		go func(i int) {
      // defer Done操作(通知goroutine结束)
			defer wg.Done()
			fmt.Printf("I am goroutine %d\n", i)
		}(i)
	}
  // 阻塞到所有goroutine结束为止
	wg.Wait()
}
➜  go-sync-package go run waitgroup.go 
I am goroutine 8
I am goroutine 2
I am goroutine 5
I am goroutine 7
I am goroutine 3
I am goroutine 4
I am goroutine 6
I am goroutine 0
I am goroutine 9
I am goroutine 1

Once

顾名思义,Once 就是执行且只执行一次。在并发编程中,可能会有一些只需要执行一次的操作,我们需要保护这种操作不能被多个线程多次进入。Once 的出现就解决了这种问题,一个 Once 对应一个函数,Once 会保证传入的这个函数只会被执行一次。Once 只提供了一个API:Do,源码中是这么解释 Do 操作的:

// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// 	var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.

这句话的大致意思就是,一个 Once 实例只能对应一个函数,Do 也只能被调用一次,如果你想定义多个函数只被执行一次,就需要实例化多个 Once。

Once 使用举例:

package main

import (
	"fmt"
	"sync"
)

// 将Once直接作为全局变量创建
var once sync.Once

func main() {
  // 利用WaitGroup阻塞main到所有goroutine结束
  var wg sync.WaitGroup
  // WaitGroup增加线程数量
	wg.Add(10)
  // 循环启动goroutine
	for i := 0; i < 10; i++ {
		go Init(&wg)
	}
  // 阻塞
	wg.Wait()
}

func Init(wg *sync.WaitGroup) {
  // defer 通知完成
	defer wg.Done()
  // 调用 Do 函数
	once.Do(func() {
		fmt.Println("initiated!!!")
	})
}

在这我们定义了一函数 Init 假设其是我们的初始化函数,其利用 once.Do 执行初始化,这里的 once 是在全局中定义的。在启动10 goroutine 同时执行 Init ,再利用之前学到的 WaitGroup 的方式等待这10个 goroutine的结束。输出结果展示,就算我启动了10个 goroutine 去调用 Init 方法,到头来 once.Do 中的函数只被执行了一次:

➜  go-sync-package go run once.go 
initiated!!!

Locker

sync/mutex.go 中定义了 Locker 接口,该接口包含了两个方法 LockUnlock。后续的 MutexRWMutex 等都实现了该接口:

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock()
	Unlock()
}

Mutex

互斥锁,实现了呢 Locker 接口,意味着其具有 LockUnlock 方法。使用该两个方法可以对代码块加锁。Lock 必须被一个 Unlock 跟随。大多情况下如果需要锁一整个代码块,使用 deferUnlock 会是不错的选择。

下面再次使用多个 goroutine 对同一个变量进行加一操作的例子进行展示(对加一操作利用 Mutex 进行加锁):

package main

import (
	"fmt"
	"sync"
)

func main() {
  // 创建Mutex和WaitGroup
	var mu sync.Mutex
	var wg sync.WaitGroup
  // 变量
	count := 0
  // 增加线程数量为1000
	wg.Add(1000)
  // 循环启动1000个goroutine
	for i := 0; i < 1000; i++ {
		go func() {
      // 加锁
			mu.Lock()
      // defer 通知结束
			defer wg.Done()
      // defer 释放锁
			defer mu.Unlock()
			count++
		}()
	}
  // 阻塞
	wg.Wait()
	fmt.Printf("count = %d\n", count)
}

启动1000个 goroutine 去执行一个被同一个 mutex 进行加锁的对 count 进行加一操作的函数,最后输出:

➜  go-sync-package go run mutex.go
count = 1000

RWMutex

RWMutex 是 sync 包中实现了 Locker 接口的另一个结构体。它跟 Mutex 的区别在于 RWMutex 即可以实现互斥锁也可以实现共享锁,也就是我们一般提到的 “写锁” 和 “读锁”,RWMutex 全称也应该是 Read Write Mutex。RWMutex 中的 LockUnlock 方法实现了 写加锁写释放锁,同时 RWMutex 还有两个区别于 Mutex 的方法 RLockRUnlock,该两个方法实现了 读加锁读释放锁RWMutex 实现了传统意义上的:写写互斥、读写互斥、读读共享的模式。使用方法几乎跟 Mutex 相似。

RWMutex 还有一个很特别的方法叫做 RLocker,其返回一个实现了 locker 接口的对象。rwmutex.go 下定义了一个新的类型 rLocker

type rlocker RWMutex

它又重写了 RWMutexLockUnlock 方法。其内部操作就是将 rLocker 类型的变量转化成 RWMutex 并调用其 RLockRUnlock 方法。

// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
	return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

你会感觉这个方法有点多余,为什么不直接自己调用 RLockRUnlock 方法呢,还要获取新的类型的变量干嘛?!这里就能体现出 go 对接口和结构体间的联系的设计思想。我们知道 RWMutex 是一个 Locker,一个 Locker 就应该只包含 LockUnlock 的方法,但是 RWMutex 的加锁和释放锁各有两种方式,所以开发者将 RWMutexLockUnlock 方法赋给了写锁,为读锁又重新了编写了一个类型,该类型就可以直接使用 LockUnlock 方法去操作 RWMutex 的读锁。

Cond

sync.Cond 就像是组合了一个泛型的 channel 和一个 Mutex。你从 channel 中获取到一个消息,但你并不知道这个消息具体是什么,这时你就锁上 Mutex 去探索到底发生了什么。假设你有一个 goroutine 需要等待100用户登录,然后再为前10个人发放奖金。最愚蠢的方法会是:

for {
    mu.Lock()
    if len(users) >= 100 {
        givePrizes(users[:10])
        mu.Unlock()
        return
    }
    mu.Unlock()
}

这种方法不断地加锁、释放锁占用大量的CPU时间,同时这种方法检查条件的次数远大于条件变更的速度,这就导致这种方法非常低效且耗费资源。

如果我们能让 goroutine 一直 sleep 到 len(users) 变化为止,然后再去检查条件那该多好!我们可以使用 channel 实现这种逻辑:

mu.Lock()
users = append(users, newUser)
mu.Unlock()
select {
// 假设登录操作变化users以后,会对ch进行写入操作
// 这里就会一直堵塞到users变化为止
case ch <- struct{}{}:
default:
}

这种方法显著地减少了我们加锁和释放锁的次数,如果用户只有登录操作(也就是说users只会增大),那么我们加锁和释放锁的次数就刚好为100次。这种方法虽然有效,但是需要一些多余的操作:对channel的读写。

sync.Cond 打包了这种操作,如果使用 sync.Cond 去执行该操作:

cond.L.Lock()
for len(users) < 100 {
    cond.Wait()
}
givePrizes(users[:10])
cond.L.Unlock()

// 执行登录操作的goroutine:
cond.L.Lock()
users = append(users, newUser)
cond.L.Unlock()
cond.Signal() // 唤醒一个等待当前Cond的goroutine

这是一个效率上的少许提升,但是这也并不是我们最终想要的效果,我们最终想要让 goroutine 一直阻塞到 len(users) == 100 时启动,但是为了实现这种效果,我们需要将条件转移到发送者一方,也就是说执行登录的 goroutine 去维护条件,等条件满足再去唤醒其他 goroutine。这种操作理论上是可以的,但是它有时会违反一些设计模式,“做登录操作的 goroutine 不应该知道关于奖金的条件”。

sync.Cond 除了上述的方法之外还有一个方法叫做 Broadcast。假设我们有多个 goroutine 都在等待不一样的事件的发生,与其利用 Signal 的方法去一个一个唤醒,我们可以直接利用 Broadcast 唤醒所有 goroutine。

sync.Cond 的用法举例:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var (
		loggedInUsers []string			// 已登录用户
		stop          bool				// 是否停止
		locker        sync.Mutex		// sync.Cond 所需的 locker
		wg            sync.WaitGroup	// 利用 sync.WaitGroup 等待线程结束
	)
	// 构造 sync.Cond
	cond := sync.NewCond(&locker)
	// 增加线程数
	wg.Add(3)
	// 此 goroutine 负责模拟登录(每过500ms登录一次)并唤醒单个goroutine
	// 最后将 stop 设为 true,并唤醒所有 goroutine
	go func() {
		defer wg.Done()
		for i := 0; i < 10; i++ {
			time.Sleep(500 * time.Millisecond)
			cond.L.Lock()
			user := fmt.Sprintf("user_%d", i)
			loggedInUsers = append(loggedInUsers, user)
			fmt.Printf("user %s has logged in\n", user)
			cond.L.Unlock()
			cond.Signal()
		}
		cond.L.Lock()
		stop = true
		cond.L.Unlock()
		cond.Broadcast()
	}()
	// 此 goroutine 首先等待登录用户数达到五,输出结果,之后等待 stop 被设为 true
	go func() {
		defer wg.Done()
		cond.L.Lock()
		for len(loggedInUsers) < 5 && !stop {
			cond.Wait()
		}
		fmt.Printf("logged in user count has exceeded 5\n")
		cond.L.Unlock()
		cond.L.Lock()
		for !stop {
			cond.Wait()
		}
		fmt.Printf("goroutine 1 has stopped\n")
		cond.L.Unlock()
	}()
	// 此 goroutine 等待 stop 被设为true
	go func() {
		defer wg.Done()
		cond.L.Lock()
		for !stop {
			cond.Wait()
		}
		fmt.Printf("goroutine 2 has stopped\n")
		cond.L.Unlock()
	}()
	wg.Wait()
}

Pool

Pool 直译 。其可以作为一个可以临时保存并取还对象的 池子。对于需要很多重复分配、回收内存的情况下 sync.Pool 是一个很好的选择。sync.Pool 可以帮我们将暂时不用的对象缓存起来,待我们下次使用的时候取出。sync.Pool 是并发安全的,对于多个 goroutine 同时创建对象或取出缓存对象是完全ok的。文档中是这么介绍 sync.Pool 的:

// A Pool is a set of temporary objects that may be individually saved and
// retrieved.
//
// Any item stored in the Pool may be removed automatically at any time without
// notification. If the Pool holds the only reference when this happens, the
// item might be deallocated.
//
// A Pool is safe for use by multiple goroutines simultaneously.
//
// Pool's purpose is to cache allocated but unused items for later reuse,
// relieving pressure on the garbage collector. That is, it makes it easy to
// build efficient, thread-safe free lists. However, it is not suitable for all
// free lists.

最典型的使用就是 fmt 包中的 newPrinter() 函数:

  • 在 golang 编程中大家经常使用的 fmt.Printfmt.Printffmt.Println 都会被被转到 FPrint 系列函数并传入 os.Stdout(一点题外内容)。那我们选一个比较典型的 fmt.FPrintf 来看内容:

    func Fprintf(w io.Writer, format string, a ...any) (n int, err error) {
    	p := newPrinter()
    	p.doPrintf(format, a)
    	n, err = w.Write(p.buf)
    	p.free()
    	return
    }
    
  • Fprintf 在第一行使用了 newPrinter 函数获取了 pp 类型的对象,我们深入到 newPrinter 当中:

    // newPrinter allocates a new pp struct or grabs a cached one.
    func newPrinter() *pp {
    	p := ppFree.Get().(*pp)
    	p.panicking = false
    	p.erroring = false
    	p.wrapErrs = false
    	p.fmt.init(&p.buf)
    	return p
    }
    
  • 再次注意第一行,其使用了 ppFreeGet 函数获取了一个对象并将其转换成了 pp 类型的指针。而这里的 ppFree 就是 sync.Pool 的实例:

    var ppFree = sync.Pool{
    	New: func() any { return new(pp) },
    }
    
  • fmt 包利用这种方法复用了 pp (其实是复用了 pp 中的 buffer),来减少内存的重复分配和GC的压力。

sync.Pool 的设计也是非常完美的,在使用时我们只会接触到三个函数 NewPutGet

  • 在实例化 sync.Pool 时需要提供 New 函数,其作用是在 Pool 中对象不够时新建对象;
  • Put 函数一般都会在对象被使用后归还时调用。
  • Get 函数就是在获取对象时使用。

Map

go 内嵌的 map 类型并不是并发安全的,如果需要并发安全的 map 我们有两种选择:

  1. map + Mutex加锁
  2. sync.Map

go 官方文档中是这么介绍 sync.Map 的:

// Map is like a Go map[interface{}]interface{} but is safe for concurrent use
// by multiple goroutines without additional locking or coordination.
// Loads, stores, and deletes run in amortized constant time.
//
// The Map type is specialized. Most code should use a plain Go map instead,
// with separate locking or coordination, for better type safety and to make it
// easier to maintain other invariants along with the map content.
//
// The Map type is optimized for two common use cases: (1) when the entry for a given
// key is only ever written once but read many times, as in caches that only grow,
// or (2) when multiple goroutines read, write, and overwrite entries for disjoint
// sets of keys. In these two cases, use of a Map may significantly reduce lock
// contention compared to a Go map paired with a separate Mutex or RWMutex.

sync.Map 的使用不需要进行额外的加锁就能保证并发安全。但是多数情况下 go 官方还是推荐使用 buildtin map 类型。sync.Map 适合在以下两种情况下使用:

  1. 读操作的次数明显大于写操作,每一个 key 只被写一次
  2. 多个 goroutine 同时读、写和重写有交集的主键集

以上两种情况下,sync.Map 的使用效率可能会相比于 builtin map 加 Mutex 明显地提高很多。

这是因为在 sync.Map 的读效率非常高的同时,其写入效率会非常低,大多情况下会低于 builtin map + Mutex 的方式,Map测试文档引用:The power of Go concurrent reading and writing sync.map

sync.Map 中公开的 API 比较多:

  • Load
    • 与 builtin map 的读基本一致,传入 key 返回 value 和一个布尔值,布尔值代表 key 是否存在
  • Store
    • 与 builtin map的写基本一致,只需传入 key 和 value 即可
  • LoadOrStore
    • 该方法需要传入 key 和 value
    • 函数首先寻找当前 key 是否存在,若存在返回其 value 和 true,若不存在将提供的 value 设为 key 的值并返回 value 和 false
  • LoadAndDelete
    • 该方法需要传入 key
    • 函数首先寻找当前 key 是否存在,若存在删除并返回其 value 和 true,否则返回 nil 和 false
  • Delete
    • 与 builtin map 的删除操作基本一致,传入 key 即可
  • Range
    • 该方法比较特殊,其需要一个以 key 和 value 作为参数布尔值作为返回值的函数作为其参数
    • 函数遍历 map 中的所有 key 和 value,对其执行传入的函数,过程中只要有一对 key value 返回 false 遍历停止。

gosdk 中的 sync 包大多只是为了一些特殊情况而设计的,在平时的编程中还是鼓励大家使用 golang 推崇的 CSP 编程模式,使用 channel + goroutine 的方式设计并发模型。sync 包中还有一部分很重要的内容即 atomic 包,其内部包含很多 CAS 相关内容,有机会将在以后的文章中继续与大家分享。

 类似资料: