多线程原子操作方法(atomic,compare_and_swap)

寇靖
2023-12-01

在前面的多处理器编程艺术文章中,谈到了lock free算法。多线程编程通常碰到的问题是很多操作的非原子性(non-atomic)。
本文通过程序实例,说明多线程中由于访问同一数据变量可能造成写丢失的情况。并通过mutex、atomic、compare_and_swap逻辑完成对于多线程的控制。
1.非线程安全方式
多个线程并发地将一个变量相加n次。

volatile unsigned long counter = 0;

void
Add(void *arg)
{
	int i;
	int iter = * ((int*)arg);

	for (i = 0; i < iter; i++)
	{
		++counter;   //非线程安全的
	}

	return NULL;
}

int
main(int argc, char *argv[]) 
{
	int i, num, count;
	pthread_t *t;
	
	if (argc != 3)
	{
		fprintf(stderr, "usage %s: <nthreads> <iterations>\n", argv[0]);

		return 1;
	}

	num = atoi(argv[1]);
	count = atoi(argv[2]);

	/* create threads */
	t = (pthread_t *) malloc(num * sizeof(pthread_t));
	for (i = 0; i < num; i++)
	{
		assert(!pthread_create(&t[i], NULL, Add, (void *) &count));
	}

	for (i = 0; i < num; i++)
	{
		assert(!pthread_join(t[i], NULL));
	}

	printf("all thread done, the result is:%lu\n", counter);

}

其中,++counter语句为非线程安全的。我们通过反汇编查看具体的操作步骤:(gdb -q ./unsafe_add ,输入dsias Add查看结果)

8048540 <Add+16>:	mov    0x8049988,%eax
0x08048545 <Add+21>:	add    $0x1,%edx
0x08048548 <Add+24>:	add    $0x1,%eax
0x0804854b <Add+27>:	cmp    %ecx,%edx
0x0804854d <Add+29>:	mov    %eax,0x8049988


其中可以看出++counter实际上按照三条语句执行。
a.先将内存中的变量保存在eax寄存器中;
b.将寄存器内容+1;
c.将寄存器中的内容写回内存;
多线程运行时,可能出现交差的情况,如下所示:
MOV counter,%rax thread-0
MOV counter,%rax thread-1
ADD 0x01,%rax thread-0
ADD 0x01,%rax thread-1
MOV %rax,counter thread-0
MOV %rax,counter thread-1
此时多个线程执行,会将部分的执行结果覆盖,所以,创建5个线程,各增加100000次,可能出现的最后结果为:
all thread done, the result is:268565
2.互斥锁方式
通过互斥访问的方法,在同一时刻只能有一个线程进入临界区进行操作。
采用pthread库,使用mutex对象控制各线程对于counter变量的访问。

void
Add(void *arg)
{
	int i;
	int iter = * ((int*)arg);

	for (i = 0; i < iter; i++)
	{
		pthread_mutex_lock(&mutex);
		++counter;  //critical section
		pthread_mutex_unlock(&mutex);
	}

	return NULL;
}

同一时刻,只能由一个线程改变counter的值。
注:此时counter要声明为volatile,不然可能会被有的编译器"虐".可以参加该链接。
http://concurrencykit.org/doc/concurrencyPrimitives.html#3.1
3. 原子操作方式
现代的CPU支持原子操作的指令。比如:xadd,配合lock指令,完成fetch_and_add操作。可以参见:
http://lists.cs.uiuc.edu/pipermail/llvmdev/2004-December/002793.html
http://en.wikipedia.org/wiki/Fetch-and-add#x86_implementation
我们通过gcc内联汇编的方式实现原子增加方法:

static inline unsigned int AtomicAdd(volatile unsigned long *mem, unsigned long add)
{
	unsigned long __tmp = add;
		__asm__ __volatile__("lock xaddq %0,%1"
			:"+r" (add),
			"+m" (*mem)
			: : "memory");

	return add + __tmp;
}

4. GCC Atomic Builtins方式
gcc从4.1版本开始提供了原子操作。可以参见:
http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
我们采用__sync_fetch_and_add方式。

void
Add(void *arg)
{
	int i;
	int iter = * ((int*)arg);

	for (i = 0; i < iter; i++)
	{
		__sync_fetch_and_add(&counter, 1);
	}

	return NULL;
}

5.lock free方式
现代cpu主要通过cmpxchg指令来达到lockfree的效果,也就是compare_and_swap机制。可以参见:
http://en.wikipedia.org/wiki/Non-blocking_algorithm
http://en.wikipedia.org/wiki/Compare-and-swap
compare_and_swap机制逻辑如下:

/* atomic */
bool compare_and_swap (unsigned long *mem, unsigned long old, unsigned long new)
{
	if (*mem == old) {
		*mem = new
		return true;
	}
	return false;
}

比较内存mem中的值是否与给定的old值一致时,用new值替换mem中的数据。考虑++counter语句,线程将counter进行拷贝,增加后再更新counter,如果通过cas比较counter的值与拷贝的值不能,则说明有其他线程已经更新了counter,需要重新load,否则更新counter。

static inline char CAS(volatile unsigned long *mem, unsigned long old, unsigned long new)
{
	unsigned long r;
	__asm__ __volatile__("lock cmpxchg %k2, %1"
						: "=a" (r), "+m" (*mem)
						: "r" (new), "0" (old)
						: "memory");

	return r == old ? 1 : 0;
}

6.测试
编写如下脚本,对于不同的方式执行时间进行测试:

for pg in atomic_add gcc_add mutex_add cas_add; do
#for pg in  mutex_add; do
	for (( i=1; $i <= 10; ++i)); do
		let nops=1000000/$i
		echo -n "$i $nops"
		time  ./$pg $i $nops 1> /dev/null
	done
done

可以看出mutex方式由于涉及到线程的切换,所花费的时间代价很大。

相关的代码为:
lockfree.tar
注:其中的xadd、cmpxchg指令是在64位主机下使用的,如果在32位主机下,可能需要使用xaddl、cmpxchgl指令。

http://www.financecomputing.net/wordpress/?p=245

 类似资料: