当前位置: 首页 > 工具软件 > Atomic > 使用案例 >

Go Atomic

芮宇航
2023-12-01

1. Go Atomic

我们已经知道, 原子操作即是进行过程中不能被中断的操作。也就是说, 针对某个值的原子操作在被进行的过程当中, CPU 绝不会再去进行其它的针对该值的操作。为了实现这样的严谨性, 原子操由 CPU 提供芯片级别的支持, 所以绝对有效, 即使在拥有多 CPU 核心, 或者多 CPU 的计算机系统中, 原子操作的保证也是不可撼动的。这使得原子操作可以完全地消除竞态条件, 并能够绝对地保证并发安全性, 它的执行速度要比其他的同步工具快得多, 通常会高出好几个数量级。

不过它的缺点也很明显, 正因为原子操作不能被中断, 所以它需要足够简单, 并且要求快速。你可以想象一下, 如果原子操作迟迟不能完成, 而它又不会被中断, 那么将会给计算机执行指令的效率带来多么大的影响, 所以操作系统层面只对针对二进制位或整数的原子操作提供了支持。

因此, 我们可以结合实际情况, 来判断是否可以将锁替换成原子操作。

1.1. 读取

atomic 包中提供了如下以 Load 为前缀的增减操作:

func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)

载入操作能够保证原子的读变量的值, 当读取的时候, 任何其他 CPU 操作都无法对该变量进行读写, 其实现机制受到底层硬件的支持。

假设我已经保证了对一个变量的写操作都是原子操作, 比如: 加或减、存储、交换等等, 那我对它进行读操作的时候, 还有必要使用原子操作吗?

答案是很有必要, 你可以对照一下读写锁, 为什么在读写锁保护下的写操作和读操作之间是互斥的? 这是为了防止读操作读到没有被修改完的值, 如果写操作还没有进行完, 读操作就来读了, 那么就只能读到仅修改了一部分的值, 这显然破坏了值的完整性。因此一旦决定要对一个共享资源进行保护, 那就要做到完全的保护, 不完全的保护基本上与不保护没有什么区别。

1.2. 赋值

atomic 包中提供了如下以 Store 为前缀的存储操作:

func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)

此类操作确保了写变量的原子性, 避免其他操作读到了修改变量过程中的脏数据。

然后对于存储, 需要掌握 2 条规则:

  • 我们不能把 nil 作为参数值传入原子值的 Store 方法, 否则就会引发一个 panic。这里要注意, 如果有一个接口类型的变量, 它的动态值是 nil, 但动态类型却不是 nil, 那么它的值就不等于 nil, 这样一个变量的值是可以被存入原子值, 这块知识可以在接口这一章中查看。
  • 我们向原子值存储的第一个值, 决定了它今后能且只能存储哪一个类型的值。例如, 我第一次向一个原子值存储了一个 string 类型的值, 那我在后面就只能用该原子值来存储字符串了。如果我又想用它存储结构体, 那么在调用它的 Store 方法的时候就会引发一个 panic, 这个 panic 会告诉我, 这次存储的值的类型与之前的不一致。

1.3. 加法

atomic 包中提供了如下以 Add 为前缀的增减操作:

func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

需要注意的是, 第一个参数必须是指针类型的值, 通过指针变量可以获取被操作数在内存中的地址, 从而施加特殊的 CPU 指令, 确保同一时间只有一个 goroutine 能够进行操作, 看个简单的示例:

func Foo() {
	var opts int64 = 0
	for i := 0; i < 50; i++ {
		// 注意第一个参数必须是地址
		atomic.AddInt64(&opts, 3) //加操作
		//atomic.AddInt64(&opts, -1) 减操作
		time.Sleep(time.Millisecond)
	}
	time.Sleep(time.Second)
	fmt.Println("opts: ", atomic.LoadInt64(&opts))
}

1.4. 减法

用于原子加法操作的函数可以做原子减法吗? 比如 atomic.AddInt32 函数可以用于减小那个被操作的整数值吗? atomic.AddInt32 函数的第二个参数代表差量, 它的类型 int32 是有符号的, 如果我们想做原子减法, 那么把这个差量设置为负整数就可以了, 对于 atomic.AddInt64 函数来说也是类似的。不过如果想用 atomic.AddUint32atomic.AddUint64 函数做原子减法, 因为它们的第二个参数的类型 uint32 和 uint64 都是无符号的, 就不能同 AddInt32 进行相同处理, 但是可以依据下面这个表达式来给定 atomic.AddUint32 函数的第二个参数值:

^uint32(-N-1))

其中的 N 代表由负整数表示的差量, 我们先要把差量的绝对值减去 1, 然后再把得到的这个无类型的整数常量, 转换为 uint32 类型的值, 最后在该值之上做按位异或操作, 就可以获得最终的参数值。简单来说, 此表达式的结果值的补码, 与使用前一种方法得到的值的补码相同, 所以这两种方式是等价的。

1.4.1. 减法封装函数

// AtomicMinusUint64 returns the new changed value, the origin value will be changed.
func AtomicMinusUint64(addr *uint64, minus int64) uint64 {
	if minus < 0 {
		minus = -minus // Even though subtraction, the value should be positive.
	}

	// https://pkg.go.dev/sync/atomic#AddUint64
	// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)).
	// In particular, to decrement x, do AddUint64(&x, ^uint64(0)).
	return atomic.AddUint64(addr, ^uint64(minus-1))
}

Test case:

// go test -v -timeout 30s -run ^TestAtomicMinusUint64$ gitlab.xxx.com/xx
func TestAtomicMinusUint64(t *testing.T) {
	cases := []struct {
		name  string
		val   uint64
		minus int64
		out   uint64
	}{
		{
			name:  "positive",
			val:   20,
			minus: 9,
			out:   11,
		},
		{
			name:  "negative",
			val:   20,
			minus: -9,
			out:   11,
		},
	}

	for _, tc := range cases {
		t.Run(tc.name, func(t *testing.T) {
			out := AtomicMinusUint64(&tc.val, tc.minus)
			assert.Equal(t, tc.out, out)
		})
	}
}

1.5. 比较并交换

该操作简称 CAS(Compare And Swap), 第一个参数的值应该是指向被操作值的指针值, 该值的类型即为* int32, 后两个参数的类型都是 int32 类型, 它们的值应该分别代表被操作值的旧值和新值, 函数在被调用之后会先判断参数 addr 指向的被操作值与参数 old 的值是否相等。仅当此判断得到肯定的结果之后, 该函数才会用参数 new 代表的新值替换掉原先的旧值。否则, 后面的替换操作就会被忽略。

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

当有大量的 goroutine 对变量进行读写操作时, 可能导致 CAS 操作无法成功, 这时可以利用 for 循环多次尝试:

var value int64

func atomicAddOp(tmp int64) {
	for {
		oldValue := value
		if atomic.CompareAndSwapInt64(&value, oldValue, oldValue+tmp) {
			return
		}
	}
}

比较并交换操作与交换操作相比有什么不同, 优势在哪里呢? 比较并交换操作即 CAS 操作, 是有条件的交换操作, 只有在条件满足的情况下才会进行值的交换。CAS 操作并不是单一的操作, 而是一种操作组合, 这与其他的原子操作都不同。正因为如此, 它的用途要更广泛一些, 例如我们将它与 for 语句联用就可以实现一种简易的自旋锁(spinlock):

	for {
		if atomic.CompareAndSwapInt32(&num2, 10, 0) {
			fmt.Println("The second number has gone to zero.")
			break
		}
		time.Sleep(time.Millisecond * 500)
	}

1.6. 交换

atomic 包中提供了如下以 Swap 为前缀的交换操作:

func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

相对于 CAS, 明显此类操作更为暴力直接, 并不管变量的旧值是否被改变, 直接赋予新值然后返回背替换的值。

1.7. 扩展知识

对于原子操作, 还有几条具体的使用建议:

  1. 不要把内部使用的原子值暴露给外界。比如, 声明一个全局的原子变量并不是一个正确的做法, 这个变量的访问权限最起码也应该是包级私有的。
  2. 如果不得不让包外, 或模块外的代码使用你的原子值, 那么可以声明一个包级私有的原子变量, 然后再通过一个或多个公开的函数, 让外界间接地使用到它。注意, 这种情况下不要把原子值传递到外界, 不论是传递原子值本身还是它的指针值。
  3. 如果通过某个函数可以向内部的原子值存储值的话, 那么就应该在这个函数中先判断被存储值类型的合法性。若不合法, 则应该直接返回对应的错误值, 从而避免 panic 的发生。
  4. 如果可能的话, 我们可以把原子值封装到一个数据类型中, 比如一个结构体类型。这样, 我们既可以通过该类型的方法更加安全地存储值, 又可以在该类型中包含可存储值的合法类型信息。

除了上述使用建议之外, 我还要再特别强调一点: 尽量不要向原子值中存储引用类型的值。因为这很容易造成安全漏洞。请看下面的代码:

var box6 atomic.Valuev6 := []int{1, 2, 3}
box6.Store(v6)v6[1] = 4 // 注意, 此处的操作不是并发安全的! 

我把一个[]int类型的切片值 v6 存入了原子值 box6, 由于切片类型属于引用类型, 我在外面改动这个切片值, 就等于修改了 box6 中存储的那个值, 这相当于绕过了原子值而进行了非并发安全的操作, 那么应该怎样修补这个漏洞呢? 可以这样做:

	store := func(v []int) {
		replica := make([]int, len(v))
		copy(replica, v)
		box6.Store(replica)
	}
	store(v6)
	v6[2] = 5 // 此处的操作是安全的。

我先为切片值 v6 创建了一个完全的副本, 这个副本涉及的数据已经与原值毫不相干, 然后再把这个副本存入 box6, 因此无论我再对 v6 的值做怎样的修改, 都不会破坏 box6 提供的安全保护。

1.8. 实战场景: 环形队列

// 环形队列
type RingBuffer struct {
	err   error
	count int32
	size  int32
	head  int32
	tail  int32
	buf   []unsafe.Pointer
}

// Get 方法从 buf 中取出对象
func (r *RingBuffer) Get() interface{} {
	// 在高并发开始的时候, 队列容易空, 直接判断空性能最优
	if atomic.LoadInt32(&r.count) <= 0 {
		return nil
	}

	// 当扣减数量后没有超, 就从队列里取出对象
	if atomic.AddInt32(&r.count, -1) >= 0 {
		idx := (atomic.AddInt32(&r.head, 1) - 1) % r.size
		if obj := atomic.LoadPointer(&r.buf[idx]); obj != unsafe.Pointer(nil) {
			o := *(*interface{})(obj)
			atomic.StorePointer(&r.buf[idx], nil)
			return o
		}
	} else {
		// 当减数量超了, 再加回去
		atomic.AddInt32(&r.count, 1)
	}
	return nil
}

// Put 方法将对象放回到 buf 中。如果 buf 满了, 返回 false
func (r *RingBuffer) Put(obj interface{}) bool {
	// 在高并发结束的时候, 队列容易满, 直接判满性能最优
	if atomic.LoadInt32(&r.count) >= r.size {
		return false
	}
	// 当增加数量后没有超, 就将对象放到队列里
	if atomic.AddInt32(&r.count, 1) <= r.size {
		idx := (atomic.AddInt32(&r.tail, 1) - 1) % r.size
		atomic.StorePointer(&r.buf[idx], unsafe.Pointer(&obj))
		return true
	}
	// 当加的数量超了, 再减回去
	atomic.AddInt32(&r.count, -1)
	return false
}

1.9. 参考

  1. atomic Documentation
 类似资料:

相关阅读

相关文章

相关问答