我们已经知道, 原子操作即是进行过程中不能被中断的操作。也就是说, 针对某个值的原子操作在被进行的过程当中, CPU 绝不会再去进行其它的针对该值的操作。为了实现这样的严谨性, 原子操由 CPU 提供芯片级别的支持, 所以绝对有效, 即使在拥有多 CPU 核心, 或者多 CPU 的计算机系统中, 原子操作的保证也是不可撼动的。这使得原子操作可以完全地消除竞态条件, 并能够绝对地保证并发安全性, 它的执行速度要比其他的同步工具快得多, 通常会高出好几个数量级。
不过它的缺点也很明显, 正因为原子操作不能被中断, 所以它需要足够简单, 并且要求快速。你可以想象一下, 如果原子操作迟迟不能完成, 而它又不会被中断, 那么将会给计算机执行指令的效率带来多么大的影响, 所以操作系统层面只对针对二进制位或整数的原子操作提供了支持。
因此, 我们可以结合实际情况, 来判断是否可以将锁替换成原子操作。
atomic 包中提供了如下以 Load 为前缀的增减操作:
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
载入操作能够保证原子的读变量的值, 当读取的时候, 任何其他 CPU 操作都无法对该变量进行读写, 其实现机制受到底层硬件的支持。
假设我已经保证了对一个变量的写操作都是原子操作, 比如: 加或减、存储、交换等等, 那我对它进行读操作的时候, 还有必要使用原子操作吗?
答案是很有必要, 你可以对照一下读写锁, 为什么在读写锁保护下的写操作和读操作之间是互斥的? 这是为了防止读操作读到没有被修改完的值, 如果写操作还没有进行完, 读操作就来读了, 那么就只能读到仅修改了一部分的值, 这显然破坏了值的完整性。因此一旦决定要对一个共享资源进行保护, 那就要做到完全的保护, 不完全的保护基本上与不保护没有什么区别。
atomic 包中提供了如下以 Store 为前缀的存储操作:
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
此类操作确保了写变量的原子性, 避免其他操作读到了修改变量过程中的脏数据。
然后对于存储, 需要掌握 2 条规则:
nil
作为参数值传入原子值的 Store
方法, 否则就会引发一个 panic。这里要注意, 如果有一个接口类型的变量, 它的动态值是 nil
, 但动态类型却不是 nil
, 那么它的值就不等于 nil
, 这样一个变量的值是可以被存入原子值, 这块知识可以在接口这一章中查看。string
类型的值, 那我在后面就只能用该原子值来存储字符串了。如果我又想用它存储结构体, 那么在调用它的 Store
方法的时候就会引发一个 panic, 这个 panic 会告诉我, 这次存储的值的类型与之前的不一致。atomic 包中提供了如下以 Add 为前缀的增减操作:
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
需要注意的是, 第一个参数必须是指针类型的值, 通过指针变量可以获取被操作数在内存中的地址, 从而施加特殊的 CPU 指令, 确保同一时间只有一个 goroutine 能够进行操作, 看个简单的示例:
func Foo() {
var opts int64 = 0
for i := 0; i < 50; i++ {
// 注意第一个参数必须是地址
atomic.AddInt64(&opts, 3) //加操作
//atomic.AddInt64(&opts, -1) 减操作
time.Sleep(time.Millisecond)
}
time.Sleep(time.Second)
fmt.Println("opts: ", atomic.LoadInt64(&opts))
}
用于原子加法操作的函数可以做原子减法吗? 比如 atomic.AddInt32
函数可以用于减小那个被操作的整数值吗? atomic.AddInt32
函数的第二个参数代表差量, 它的类型 int32 是有符号的, 如果我们想做原子减法, 那么把这个差量设置为负整数就可以了, 对于 atomic.AddInt64
函数来说也是类似的。不过如果想用 atomic.AddUint32
和 atomic.AddUint64
函数做原子减法, 因为它们的第二个参数的类型 uint32 和 uint64 都是无符号的, 就不能同 AddInt32 进行相同处理, 但是可以依据下面这个表达式来给定 atomic.AddUint32
函数的第二个参数值:
^uint32(-N-1))
其中的 N
代表由负整数表示的差量, 我们先要把差量的绝对值减去 1
, 然后再把得到的这个无类型的整数常量, 转换为 uint32 类型的值, 最后在该值之上做按位异或操作, 就可以获得最终的参数值。简单来说, 此表达式的结果值的补码, 与使用前一种方法得到的值的补码相同, 所以这两种方式是等价的。
// AtomicMinusUint64 returns the new changed value, the origin value will be changed.
func AtomicMinusUint64(addr *uint64, minus int64) uint64 {
if minus < 0 {
minus = -minus // Even though subtraction, the value should be positive.
}
// https://pkg.go.dev/sync/atomic#AddUint64
// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)).
// In particular, to decrement x, do AddUint64(&x, ^uint64(0)).
return atomic.AddUint64(addr, ^uint64(minus-1))
}
Test case:
// go test -v -timeout 30s -run ^TestAtomicMinusUint64$ gitlab.xxx.com/xx
func TestAtomicMinusUint64(t *testing.T) {
cases := []struct {
name string
val uint64
minus int64
out uint64
}{
{
name: "positive",
val: 20,
minus: 9,
out: 11,
},
{
name: "negative",
val: 20,
minus: -9,
out: 11,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
out := AtomicMinusUint64(&tc.val, tc.minus)
assert.Equal(t, tc.out, out)
})
}
}
该操作简称 CAS(Compare And Swap), 第一个参数的值应该是指向被操作值的指针值, 该值的类型即为* int32, 后两个参数的类型都是 int32 类型, 它们的值应该分别代表被操作值的旧值和新值, 函数在被调用之后会先判断参数 addr 指向的被操作值与参数 old 的值是否相等。仅当此判断得到肯定的结果之后, 该函数才会用参数 new 代表的新值替换掉原先的旧值。否则, 后面的替换操作就会被忽略。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
当有大量的 goroutine 对变量进行读写操作时, 可能导致 CAS 操作无法成功, 这时可以利用 for 循环多次尝试:
var value int64
func atomicAddOp(tmp int64) {
for {
oldValue := value
if atomic.CompareAndSwapInt64(&value, oldValue, oldValue+tmp) {
return
}
}
}
比较并交换操作与交换操作相比有什么不同, 优势在哪里呢? 比较并交换操作即 CAS 操作, 是有条件的交换操作, 只有在条件满足的情况下才会进行值的交换。CAS 操作并不是单一的操作, 而是一种操作组合, 这与其他的原子操作都不同。正因为如此, 它的用途要更广泛一些, 例如我们将它与 for
语句联用就可以实现一种简易的自旋锁(spinlock):
for {
if atomic.CompareAndSwapInt32(&num2, 10, 0) {
fmt.Println("The second number has gone to zero.")
break
}
time.Sleep(time.Millisecond * 500)
}
atomic 包中提供了如下以 Swap 为前缀的交换操作:
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
相对于 CAS, 明显此类操作更为暴力直接, 并不管变量的旧值是否被改变, 直接赋予新值然后返回背替换的值。
对于原子操作, 还有几条具体的使用建议:
除了上述使用建议之外, 我还要再特别强调一点: 尽量不要向原子值中存储引用类型的值。因为这很容易造成安全漏洞。请看下面的代码:
var box6 atomic.Valuev6 := []int{1, 2, 3}
box6.Store(v6)v6[1] = 4 // 注意, 此处的操作不是并发安全的!
我把一个[]int
类型的切片值 v6
存入了原子值 box6
, 由于切片类型属于引用类型, 我在外面改动这个切片值, 就等于修改了 box6
中存储的那个值, 这相当于绕过了原子值而进行了非并发安全的操作, 那么应该怎样修补这个漏洞呢? 可以这样做:
store := func(v []int) {
replica := make([]int, len(v))
copy(replica, v)
box6.Store(replica)
}
store(v6)
v6[2] = 5 // 此处的操作是安全的。
我先为切片值 v6
创建了一个完全的副本, 这个副本涉及的数据已经与原值毫不相干, 然后再把这个副本存入 box6
, 因此无论我再对 v6
的值做怎样的修改, 都不会破坏 box6
提供的安全保护。
// 环形队列
type RingBuffer struct {
err error
count int32
size int32
head int32
tail int32
buf []unsafe.Pointer
}
// Get 方法从 buf 中取出对象
func (r *RingBuffer) Get() interface{} {
// 在高并发开始的时候, 队列容易空, 直接判断空性能最优
if atomic.LoadInt32(&r.count) <= 0 {
return nil
}
// 当扣减数量后没有超, 就从队列里取出对象
if atomic.AddInt32(&r.count, -1) >= 0 {
idx := (atomic.AddInt32(&r.head, 1) - 1) % r.size
if obj := atomic.LoadPointer(&r.buf[idx]); obj != unsafe.Pointer(nil) {
o := *(*interface{})(obj)
atomic.StorePointer(&r.buf[idx], nil)
return o
}
} else {
// 当减数量超了, 再加回去
atomic.AddInt32(&r.count, 1)
}
return nil
}
// Put 方法将对象放回到 buf 中。如果 buf 满了, 返回 false
func (r *RingBuffer) Put(obj interface{}) bool {
// 在高并发结束的时候, 队列容易满, 直接判满性能最优
if atomic.LoadInt32(&r.count) >= r.size {
return false
}
// 当增加数量后没有超, 就将对象放到队列里
if atomic.AddInt32(&r.count, 1) <= r.size {
idx := (atomic.AddInt32(&r.tail, 1) - 1) % r.size
atomic.StorePointer(&r.buf[idx], unsafe.Pointer(&obj))
return true
}
// 当加的数量超了, 再减回去
atomic.AddInt32(&r.count, -1)
return false
}