当前位置: 首页 > 工具软件 > Tair > 使用案例 >

基于tair的分布式锁实现原理

鲜于玮
2023-12-01

分布式锁概述

分布式锁的实现主要分为三种方式:
1、基于Mysql的行锁实现

优点:
实现简单,不需要额外的中间件来协助实现
缺点:

  • 增大了数据库的读写压力
  • 可能增大数据库的死锁的产生。例如:如果琐是数据库中的行A,然后甲获得A的锁后执行操作,然后请求资源B,而乙当前正持有B的资源却在请求分布式锁A,那么引起死锁。

2、基于Zookeeper的实现(可以设计为公平锁、也可以设计为非公平锁)
基本原理其实是基于Zookeeper的强一致性和顺序一致性特点,实现方式如下:
在Zookeeper中建立一个路径route,当某服务器需要请求锁资源时就在route下写入临时顺序节点,然后观察自己是否是当前路径下的最小节点,如果是则说明当前自己持有锁,如果不是则启动监听器监听前一节点的删除时间,当前一节点删除后自己获得锁。由于当本节点在设置监听器的时候可能前一节点已经被删除了,从而引起死锁,但是在Zookeeper中设置监听器的操作是原子性的,从而避免这个问题。

优点:
1、实现简单
2、由于是临时节点,所以天然没有死锁的问题
缺点:
需要接入zk,并且需要在客户端引入复杂的逻辑

3、基于Tair、Redis等分布式缓存中间件的实现

优点:简单、高效,且公司都有部署这些缓存中间件


基于Tair的分布式锁实现方案

基本原理是在全局下Tair只用一个唯一控制点,从而实现加锁控制。

1、基于incr和decr实现

在Tair中提供了两个函数:

Result incr(int namespace, Serializable key, int value, int defaultValue, 
            int expireTime, int lowBound, int upperBound)
Result decr(int namespace, Serializable key, int value, int defaultValue, 
            int expireTime, int lowBound, int upperBound)

可以为namespace空间下key下的值增加或者减少value的大小,同时可以设置值的最大值和最小值。
因此这种分布式锁的实现方式为:
1、设置key下的值的最大值为1,最小值为0.默认值为0
2、当某服务器需要加锁时就尝试为key值加1,如果当前value为0则成功并且加锁,否则返回失败,调用函数:
incr(namespace, key, 1,0, expireTime, 0, 1);
3、加锁成功后执行事务,然后解锁即可
缺点:

  • 该锁是不可重入的,因为无法通过value区分是哪台服务器加的锁。
  • 由于无法根据value判断是哪一台服务器,因此会出现锁被其它服务器误解的情况


2、基于get/put/invalide方式实现

实现方式如下:
1、当服务器需要获取锁时调用get(namespace,key)查看当前key下value是否存在,如果存在说明当前锁已被占用。
2、如果当前value存在则说明锁为空,则通过put(namespace,key,version,value,expireTime)上锁,即为key下写入value,注意这个地方version一定为2,因此在put时如果有其它锁一起在抢,当锁被抢走后下次再写value的version已经变为三,因此其它服务器无法写入,只能继续等待。
3、最后通过invalid(namespace,key,value)解锁,即删除该key值下的value。

分布式锁DistributedLock概览

使用方法

DistributedLock锁是注解形式,使用方法是在需要加锁执行的方法上写上注解,如下:

@DistributedLock(customizeLockKey = "test")
public String test1(){
    for (int i = 0; i < 100; i++) {
        System.out.println("Thread1 run...");
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    return "Thread1 finish";
}

其中加的锁就是字符串test。

代码测试

定义两个分别加分布式锁的方法,但是加的锁不一样

@Component
public class ConcurentApply{
    @DistributedLock(customizeLockKey = "test1")
    public String test2(){
        for (int i = 0; i < 100; i++) {
            System.out.println("Thread2 run...");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return "Thread2 finish";
    }
    @DistributedLock(customizeLockKey = "test")
    public String test1(){
        for (int i = 0; i < 100; i++) {
            System.out.println("Thread1 run...");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return "Thread1 finish";
    }
}

然后定义一个继承自Thread的类来调用test2方法来实现两个方法并发执行:

@Component
public class Thread1 extends Thread{
    @Autowired
    private ConcurentApply apply;
    public void run(){
        System.out.println(apply.test2());
    }
}

然后再业务代码中调用这两个方法,让这两个带锁的方法并发执行,查看效果:

@Autowired
private lateinit var td: Thread1;

@Autowired
private lateinit var apply: ConcurentApply;


override fun queryAlgos(param: ZimaAlgoParam?): List<ZimaAlgo?>? {
    td.start();
    apply.test1();
    return algoDao.queryAlgos(param)
}

执行后系统输出如下:

[TairLock]tryLock success, key=TM_M_P_DistributedLock_test, value=B-M1AEMD6R-2128.local__http-nio-7001-exec-1__5137d1f6-3ff9-4421-97a0-2a9ccdf1d9d8
[TairLock]tryLockWait success, key=TM_M_P_DistributedLock_test
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
... ... ... ...
[TairLock]tryLock success, key=TM_M_P_DistributedLock_test1, value=B-M1AEMD6R-2128.local__Thread-83__a44dd850-6d58-42a2-9228-6fdb197c54f1
[TairLock]tryLockWait success, key=TM_M_P_DistributedLock_test1
Thread2 run...
Thread1 run...
Thread2 run...
Thread1 run...
Thread2 run...
Thread1 run...
Thread2 run...
Thread1 run...
Thread2 run...
Thread1 run...
Thread2 run...
Thread1 run...
Thread2 run...
Thread1 run...
... ... ... ...
[TairLock]unlock success, key=TM_M_P_DistributedLock_test, threadValue=B-M1AEMD6R-2128.local__http-nio-7001-exec-1__5137d1f6-3ff9-4421-97a0-2a9ccdf1d9d8
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
... ... ... ...
[TairLock]unlock success, key=TM_M_P_DistributedLock_test1, threadValue=B-M1AEMD6R-2128.local__Thread-83__a44dd850-6d58-42a2-9228-6fdb197c54f1
Thread2 finish

可以看到,虽然两个方法都加锁了,但是由于两个方法加了不同的锁,因此两个方法相互不影响,具体是Thread1先加锁成功,然后开始运行test1方法,在运行途中Thread2加锁成功,两个方法并发运行,然后一次释放锁,直到结束。
将两个方法都加上同样的锁test,

@Component
public class ConcurentApply{
    @DistributedLock(customizeLockKey = "test")
    public String test2(){
        for (int i = 0; i < 100; i++) {
            System.out.println("Thread2 run...");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return "Thread2 finish";
    }
    @DistributedLock(customizeLockKey = "test")
    public String test1(){
        for (int i = 0; i < 100; i++) {
            System.out.println("Thread1 run...");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return "Thread1 finish";
    }
}

重新测试,测试结果如下:

[TairLock]tryLock success, key=TM_M_P_DistributedLock_test, value=B-M1AEMD6R-2128.local__http-nio-7001-exec-6__2694d927-fa03-4ab0-b130-f86d78823fe6
[TairLock]tryLockWait success, key=TM_M_P_DistributedLock_test
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
... ... ... ...
[TairLock]tryLock fail,  key=TM_M_P_DistributedLock_test, tairLockManager.putValue.ResultCode=code=-3997, msg=version error. Always happened in highly concurrent write circumstances, try reduce concurrency
[TairLock]tryLockWait, key=TM_M_P_DistributedLock_test
[TairLock]tryLockWait Wait 300 ms For Continue, Now Already Wait For 0 ms.
Thread1 run...
Thread1 run...
Thread1 run...
[TairLock]tryLock fail,  key=TM_M_P_DistributedLock_test, tairLockManager.getValue.getRc()=code=0, msg=success
[TairLock]tryLockWait Wait 500 ms For Continue, Now Already Wait For 300 ms.
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
[TairLock]tryLock fail,  key=TM_M_P_DistributedLock_test, tairLockManager.getValue.getRc()=code=0, msg=success
[TairLock]tryLockWait Wait 900 ms For Continue, Now Already Wait For 800 ms.
Thread1 run...
Thread1 run...
Thread1 run...
... ... ... ...
[TairLock]tryLock fail,  key=TM_M_P_DistributedLock_test, tairLockManager.getValue.getRc()=code=0, msg=success
[TairLock]tryLockWait Wait 1700 ms For Continue, Now Already Wait For 1700 ms.
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
... ... ... ...
[TairLock]tryLock fail,  key=TM_M_P_DistributedLock_test, tairLockManager.getValue.getRc()=code=0, msg=success
[TairLock]tryLockWait Wait 3300 ms For Continue, Now Already Wait For 3400 ms.
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
Thread1 run...
... ... ... ...
[TairLock]unlock success, key=TM_M_P_DistributedLock_test, threadValue=B-M1AEMD6R-2128.local__http-nio-7001-exec-6__2694d927-fa03-4ab0-b130-f86d78823fe6
[TairLock]tryLock success, key=TM_M_P_DistributedLock_test, value=B-M1AEMD6R-2128.local__Thread-83__01447b6c-4395-45bb-b4e3-1fb9f169baea
[TairLock]tryLockWait success, key=TM_M_P_DistributedLock_test, alreadyWaitSec=6700
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
Thread2 run...
... ... ... ...
[TairLock]unlock success, key=TM_M_P_DistributedLock_test, threadValue=B-M1AEMD6R-2128.local__Thread-83__01447b6c-4395-45bb-b4e3-1fb9f169baea
Thread2 finish

可以看到,Thread1首先抢到分布式锁,然后开始执行方法,此时由于锁已经被占用,导致Thread2一直尝试获得锁失败,不断陷入等待,知道Thread1将锁释放后Thread2才占有锁开始执行,最后释放。

分布式锁DistributedLock源码实现

分布式锁DistributedLock的最底层实现是基于Tair的get/put方式来实现的,源码解析如下:
首先是DistributedLock注解的定义:

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class DistributedLock(
        val alone: Boolean = false,               // 是否为单机锁
        val methodOnly: Boolean = true,           // 是否仅锁定方法
        val customizeLockKey: String = "",        // 自定义锁名
        val lockKey: Array<String> = emptyArray() // 主动指定方法名以外锁定的key
)

该注解只定义在方法上,并且默认采用分布式锁,然后代码中通过一个叫做LockAdivisior的类来拦截该注解,代码如下:

@Aspect
@Component
open class LockAdvisor {
    val GOD_LOCK = Object()

    @Pointcut("@annotation(com.tmall.marketing.annotations.DistributedLock)")
    fun needLock() {
    }

    @Around("needLock()")
    open fun invoke(joinPoint: ProceedingJoinPoint): Any? {
        ......
        return result
    }

    @Qualifier("tairLocker")
    @Autowired
    lateinit var locker: TairLocker
}

可以看到在该类中通过AOP注解@Pointcut来拦截DistributedLock,将所有被DistributedLock注解修饰的方法定义为切点,并为这些切点方法织入一个Around的增强方法,在这个增强方法中来实现加锁等操作。Around增强方法具体如下(Kotlin代码):

@Around("needLock()")
open fun invoke(joinPoint: ProceedingJoinPoint): Any? {
    println("增强");
    val methodMeta = joinPoint.signature as MethodSignature

    val method = methodMeta.method
    val config = method.getAnnotation(DistributedLock::class.java)

    var result: Any? = null

    // 单机锁
    if (config.alone) {
        synchronized(GOD_LOCK) {
            result = joinPoint.proceed(joinPoint.args)
        }
    } else {
        var lockKey = "TM_M_P_DistributedLock_"

        try {
            when {
                // 自定义锁名
                config.customizeLockKey.isNotEmpty() -> {
                    lockKey += config.customizeLockKey
                }
                // 用户主动指定锁住的参数
                config.lockKey.isNotEmpty() -> {
                    config.lockKey.forEach {
                        lockKey += joinPoint.args[methodMeta.parameterNames.indexOf(it)]
                    }
                }
                config.methodOnly -> {
                    //使用用户自定义名称+方法名称来定义琐名
                    lockKey += method.name
                }
                else -> {
                    lockKey += (method.name + joinPoint.args.joinToString())
                }
            }
        } catch (ex: Throwable) {
            lockKey += (method.name + joinPoint.args.joinToString())    // 如果生成lockKey失败,使用方法名+全参数作为key
        }

        try {
            if (locker.tryLockWait(lockKey)) {
                result = joinPoint.proceed(joinPoint.args)
            }
        } finally {
            locker.unlock(lockKey)
        }
    }

    return result
}

@Qualifier("tairLocker")
@Autowired
lateinit var locker: TairLocker

其实该方法中主要逻辑不多,首先获得当前的注解DistributedLock属性,如果alone属性为true说明是本地锁,则直接通过synchronized执行即可
如果是分布式锁则先通过DistributedLock中的customizeLockKey属性拼接lockKey,然后根据lockKey去获取锁,如果没获取到则等待,直到获取到锁或者超时

locker.tryLockWait(lockKey)

获取到锁后开始执行原始方法,最后释放锁即可。
其中比较关键的就是tryLockWait和unlock方法是怎么实现的,观察到这两个方法其实是组合属性locker: TairLocker来实现的,观察者类TairLocker。

TairLocker锁的源码实现

TairLocker类的属性和方法如下:

/**
 * 使用 Tair 实现的分布式锁, Copy from <a href="https://www.atatech.org/articles/77540">
 */
@Component
public class TairLocker {

    /**
     * 不能为0或1,其他值都可以:0表示强制更新,如果是1则第二次刚好是2,也不能保证锁
     */
    private final int DEFAULT_VERSION = 2;

    /**
     * 超时时间,单位为秒
     */
    private final int EXPIRE_TIME = 10;

    /**
     * wait重试最大次数,间隔时间2的指数级增长
     */
    private final int LOCK_GET_WAIT_TIMES = 10;

    /**
     * wait重试base时间ms
     */
    private final int LOCK_GET_WAIT_BASE_TIME = 100;

    /**
     * wait重试递增时间ms
     */
    private final int LOCK_GET_WAIT_GAP = 200;

    /**
     * wait重试最大时间ms
     */
    private final int LOCK_GET_WAIT_TOTAL_TIME = 60 * 1000;

    /**
     * get重试次数
     */
    private final int LOCK_GET_MAX_RETRY = 3;

    /**
     * put重试次数
     */
    private final int LOCK_PUT_MAX_RETRY = 3;


    @Autowired
    private TairManager tairManager;

    @Value("${tair.jiuxi.namespace}")
    private int tairNamespace;

    /**
     * 避免delete了非本线程put的值
     */
    private final ThreadLocal<String> currentThreadValue = new InheritableThreadLocal<>();

    /**
     * 尝试获取锁,get或put失败都分别进行重试,支持重入
     */
    public boolean tryLock(String key, String reqValue)

    /**
     * 不可重入
     */
    public boolean tryLock(String key)

    /**
     * 自旋 wait重试获取锁,与unlock配对使用
     */
    public boolean tryLockWait(String key)

    /**
     * 判断是否是当前线程持有的锁,释放锁
     */
    public boolean unlock(String key)

    //是否获取锁失败
    private boolean isGetFail(Result<DataEntry> result) 
	
    //是否加锁失败
    private boolean isPutFail(ResultCode code)

    /**
     * 用于防止被其他线程非法unlock
     */
    private String genValue()
}

TairLocker类中主要关注以下几个重点:

  • 1、在TairLocker类中组合了一个TairManager对象,这个对象才是真正实现get/put/invalid的实现类对象。
  • 2、该类主要是在TairManager类上进行了一层逻辑封装而已,主要封装出了三个方法:
    • tryLock:尝试get+put去获取一个锁,成功返回true,否则失败
    • tryLockWait:不断尝试使用tryLock方法去获取一个锁,直到超过最大次数或者超过最大时间
    • unlock:调用invalid函数解锁,多了一层校验,只有获得锁的线程才可以解这个锁

tryLock方法

下面分别分析这三个方法,首先是tryLock方法,tryLock方法主要就是尝试一次获取和加锁,源码和注解如下:

/**
 * 尝试获取锁,get或put失败都分别进行重试,支持重入
 */
public boolean tryLock(String key, String reqValue) {
    int retryGet = 0;
    Result<DataEntry> result = null;
    // 支持重试
    while (retryGet++ < LOCK_GET_MAX_RETRY && isGetFail(result)) {//一直尝试获得锁知道成功或者超过次数
        //通过tair尝试获取key下的值,如果当前琐被占用直接返回失败,如果没有tail服务返回空,如果发现该key下没有value则返回成功
        result = tairManager.get(tairNamespace, key);
    }

    //没有该服务
    if (result == null) {
        System.out.println("[TairLock] tryLock, maybe Tair service  is unavailable");
        return false;
    }

    String value = reqValue;
    //说明当前琐还没有被占用
    if (ResultCode.DATANOTEXSITS.equals(result.getRc())) {
        //生成一个随机的value来写入到该key
        if (value == null) {
            value = genValue();
        }
        ResultCode code = null;
        int retryPut = 0;
        //开始put加锁
        while (retryPut++ < LOCK_PUT_MAX_RETRY && isPutFail(code)) { //一直尝试加锁直到成功或者超过最大次数
            //通过tail的put函数尝试加锁
            code = tairManager.put(tairNamespace, key, value, DEFAULT_VERSION, EXPIRE_TIME);
        }
        //加锁成功,返回true
        if (code != null && code.isSuccess()) {
            currentThreadValue.set(value);
            System.out.println(String.format("[TairLock]tryLock success, key=%s, value=%s", key, value));
            return true;
        } else {//加锁失败,返回false
            System.out.println(String.format("[TairLock]tryLock fail,  key=%s, tairLockManager.putValue.ResultCode=%s", key, code));
            return false;
        }
    } else {//当前琐已经被占用了
        //这个代码就是可从入琐的逻辑,如果发现琐已经被占用了并且琐的value和自己的value一样则直接返回true,说明是重入琐
        if (reqValue != null && result.getValue() != null && reqValue.equals(result.getValue().getValue())) {
            System.out.println(String.format("[TairLock]tryLock true,reenterable lock,  key=%s, reqValue=%s", key, reqValue));
            return true;
        }
        //返回失败
        System.out.println(String.format("[TairLock]tryLock fail,  key=%s, tairLockManager.getValue.getRc()=%s", key, result.getRc()));
        return false;
    }
}

/**
 * 不可重入
 */
public boolean tryLock(String key) {
    return tryLock(key, null);
}

上述方法注解已经写的很详细了,总结来说就是逻辑如下:

  • 先采用get方法查看锁是否可以获取,
  • 如果当前锁还没有被占用,采用put方法去获取锁,如果成功了返回true,失败了返回false
  • 如果当前锁已经被占用了,则通过传入的value和当前锁的value对比,如果一致说明是自己加的锁,返回true,实现锁的可重入,(这也说明为什么tryLock(String key)是不可重入的)
  • 否则返回false,表明锁获取失败


tryLockWait方法

/**
 * 自旋 wait重试获取锁,与unlock配对使用
 */
public boolean tryLockWait(String key) {
    //用常量池,确保获取的是同一个监视器
    key = key.intern();

    //通过get和put的组合拳尝试一次加锁,成功返回true
    boolean rs = tryLock(key);
    //说明加锁成功
    if (rs) {
        System.out.println(String.format("[TairLock]tryLockWait success, key=%s", key));
        return rs;
    } else {//加锁失败
        System.out.println(String.format("[TairLock]tryLockWait, key=%s", key));
        long alreadyWaitSec = 0;
        //!!! 主要是如果在同一个进程中有抢占同一个锁的线程,则通过synchronized让一个线程去抢就好了
        synchronized (key) {
            for (int i = 0; i < LOCK_GET_WAIT_TIMES; i++) {//开始反复尝试等待获得锁并加锁 10次
                //如果当前已经等待超过了最大时间直接返回false  60s
                if (alreadyWaitSec > LOCK_GET_WAIT_TOTAL_TIME) {
                    System.out.println(String.format("[TairLock]tryLockWait 超过循环等待最大时间=%s(ms),跳出循环,不再等待", LOCK_GET_WAIT_TOTAL_TIME));
                    return false;
                }
                //当前这次等待入琐的时间,100+2^i*200
                long waitFor = LOCK_GET_WAIT_BASE_TIME + Math.round(Math.pow(2, i) * LOCK_GET_WAIT_GAP);
                // 如果当前时间和此次等待时间累加后超过最大等待时间就等待到最大时间即可
                if (alreadyWaitSec + waitFor > LOCK_GET_WAIT_TOTAL_TIME) {
                    System.out.println(String.format("[TairLock]tryLockWait Wait %d Sec For Continue (the last time), Now Already Wait For %d ms.", LOCK_GET_WAIT_TOTAL_TIME - alreadyWaitSec, alreadyWaitSec));
                    try {
                        //等待到最大时间即可
                        key.wait(LOCK_GET_WAIT_TOTAL_TIME - alreadyWaitSec);
                    } catch (InterruptedException e) {
                        System.out.println("[TairLock] tryLockWait fail InterruptedException");
                        return false;
                    }
                } else {//按照waitFor正常等待
                    System.out.println(String.format("[TairLock]tryLockWait Wait %d ms For Continue, Now Already Wait For %d ms.", waitFor, alreadyWaitSec));
                    try {
                        key.wait(waitFor);
                    } catch (InterruptedException e) {
                        System.out.println("[TairLock] tryLockWait fail InterruptedException");
                        return false;
                    }
                }

                alreadyWaitSec += waitFor;
                //再等待一段时间后重试
                rs = tryLock(key);
                //本次等待后加锁成功了
                if (rs) {
                    try {
                        //唤醒其他等待该key的线程别等了,赶紧准备等我执行完程序后抢锁
                        key.notifyAll();
                    } catch (Exception e) {
                        System.out.println("[TairLock] unlock notifyAll fail");
                    }

                    System.out.println(String.format("[TairLock]tryLockWait success, key=%s, alreadyWaitSec=%s", key, alreadyWaitSec));
                    return rs;
                }
            }
            //等待到了最大次数或者时间都没有加锁成功,则直接返回false了
            System.out.println(String.format("[TairLock]tryLockWait fail max retry times=%s, key=%s", LOCK_GET_WAIT_TIMES, key));
            return rs;
        }
    }
}

tryLockWait方法的大致逻辑如下:

  • 首先先使用tryLock尝试一次获取锁,成功则直接返回true
  • 失败后进入循环等待加锁,首先先等待一段时间,每次等待的时间会呈指数增长,等待完成后再次tryLock尝试,成功则返回true,失败则继续循环加锁
  • 如果超过了最大循环次数或最大等待时间,停止循环,返回false


unLock方法

/**
 * 判断是否是当前线程持有的锁,释放锁
 */
public boolean unlock(String key) {
    //先获取该被占用的琐的value值
    Result<DataEntry> result = tairManager.get(tairNamespace, key);
    String threadValue = currentThreadValue.get();
    if (threadValue != null && result != null && result.getValue() != null) {
        String value = (String) result.getValue().getValue();
        //比较被占用的锁和自身已经加过的琐的值是否相同
        if (threadValue.equals(value)) {
            //如果相同,则解锁
            ResultCode rc = tairManager.invalid(tairNamespace, key);
            if (rc != null && rc.isSuccess()) {
                System.out.println(String.format("[TairLock]unlock success, key=%s, threadValue=%s"
                        , key, threadValue));
                currentThreadValue.remove();

                return true;
            } else {
                System.out.println(String.format("[TairLock]unlock failed, tairLockManager.invalidValue fail, key=%s, ResultCode=%s"
                        , key, rc));
                return false;
            }
        } else {
            System.out.println(String.format("[TairLock]unlock failed,value is not equal threadValue, key=%s, threadValue=%s, value=%s"
                    , key, threadValue, value));
            return false;
        }
    }

    System.out.println(String.format("[TairLock]unlock failed, key=%s, threadValue=%s, result=%s"
            , key, threadValue, result));
    return false;
}

unLock方法比较简单,不多介绍

总结

该分布式锁采用基于Tair的get/put/invalid方式实现的,同时该锁是可重入的,并且可以防止误删。

  • 当锁已经被占用后通过比较自己的value和锁的value值来判断是否是自己加的锁,从而实现可重入
  • 在加锁时保存自己存入的value值,在解锁时与锁的value比较,防止误删。
  • 其实能够实现这些的原因就是因为不同的线程在加锁的时候写入的value是不同的,从而可以区分每个线程,线程间的并发通过version版本号来解决
  • 其实通过incr和Decr也是可以实现可重入的,只要在封装一个中间层即可,保存自己当前的持有锁状态,但是没有办法防止误删除,因为没有办法判断当前锁是属于谁的
 类似资料: