我在之前的文章SpringBoot基于Redisson实现分布式锁并分析其原理介绍了分布式锁的使用。今天我们介绍的是一款基于redis的分布式锁Spring-boot-starterr组件,使得项目拥有分布式锁能力变得异常简单,支持spring boot,和spirng mvc等spring相关项目。基于注解驱动,支持spring Spel,方便的定义锁的key的粒度。本文中Spring Boot的版本是2.5.2,spring-boot-klock-starter的版本是1.4-RELEASE。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.alian</groupId>
<artifactId>redis-klock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redis-klock</name>
<description>Spring Boot整合klock分布式锁</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>cn.keking</groupId>
<artifactId>spring-boot-klock-starter</artifactId>
<version>1.4-RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
server:
port: 8081
servlet:
context-path: /klock
spring:
klock:
#单节点地址
address: 192.168.0.193:6379
#密码,没有密码就不要配置password
#password:
#获取锁最长阻塞时间(默认:60,单位:秒)
wait-time: 20
#已获取锁后自动释放时间(默认:60,单位:秒)
lease-time: 20
spring:
klock:
#密码
#password:
#获取锁最长阻塞时间(默认:60,单位:秒)
wait-time: 20
#已获取锁后自动释放时间(默认:60,单位:秒)
lease-time: 20
#集群配置
cluster-server:
node-addresses: 192.168.0.111:6379,192.168.0.112:6379,192.168.0.113:6379,192.168.0.101:6379,192.168.0.102:6379,192.168.0.103:6379,192.168.0.114:6379,192.168.0.104:6379
单节点和集群配置主要区别在于spring.klcok.address 和 spring.klcok.cluster-server.node-addresses二选一。
spring-boot-klock-starter这个是基于Spring-boot-starter的一个组件,里面具体的实现还是使用了Redisson,关于Redisson的实现原理,我开篇已经提到了,我另外的文章有详细的介绍,我们这次所了解的是Klock的一些常见使用。
Klock.java
package org.springframework.boot.autoconfigure.klock.annotation;
import org.springframework.boot.autoconfigure.klock.model.LockTimeoutStrategy;
import org.springframework.boot.autoconfigure.klock.model.LockType;
import org.springframework.boot.autoconfigure.klock.model.ReleaseTimeoutStrategy;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*
* @author kl
* @since 2017/12/29
* Content :加锁注解
*/
@Target(value = {ElementType.METHOD})
@Retention(value = RetentionPolicy.RUNTIME)
public @interface Klock {
/**
* 锁的名称
* @return name
*/
String name() default "";
/**
* 锁类型,默认可重入锁
* @return lockType
*/
LockType lockType() default LockType.Reentrant;
/**
* 尝试加锁,最多等待时间
* @return waitTime
*/
long waitTime() default Long.MIN_VALUE;
/**
*上锁以后xxx秒自动解锁
* @return leaseTime
*/
long leaseTime() default Long.MIN_VALUE;
/**
* 自定义业务key
* @return keys
*/
String [] keys() default {};
/**
* 加锁超时的处理策略
* @return lockTimeoutStrategy
*/
LockTimeoutStrategy lockTimeoutStrategy() default LockTimeoutStrategy.NO_OPERATION;
/**
* 自定义加锁超时的处理策略
* @return customLockTimeoutStrategy
*/
String customLockTimeoutStrategy() default "";
/**
* 释放锁时已超时的处理策略
* @return releaseTimeoutStrategy
*/
ReleaseTimeoutStrategy releaseTimeoutStrategy() default ReleaseTimeoutStrategy.NO_OPERATION;
/**
* 自定义释放锁时已超时的处理策略
* @return customReleaseTimeoutStrategy
*/
String customReleaseTimeoutStrategy() default "";
}
这个核心接口,包含了我们使用@Klock 注解用到的参数,比如keys、waitTime、leaseTime、lockTimeoutStrategy、releaseTimeoutStrategy等,新版本中还增加了对自定义加锁超时策略及释放锁超时策略,增加了程序的可扩展性。
LockTimeoutStrategy.java
package org.springframework.boot.autoconfigure.klock.model;
import org.aspectj.lang.JoinPoint;
import org.springframework.boot.autoconfigure.klock.handler.KlockTimeoutException;
import org.springframework.boot.autoconfigure.klock.handler.lock.LockTimeoutHandler;
import org.springframework.boot.autoconfigure.klock.lock.Lock;
import java.util.concurrent.TimeUnit;
/**
* @author wanglaomo
* @since 2019/4/15
**/
public enum LockTimeoutStrategy implements LockTimeoutHandler {
/**
* 继续执行业务逻辑,不做任何处理
*/
NO_OPERATION() {
@Override
public void handle(LockInfo lockInfo, Lock lock, JoinPoint joinPoint) {
// do nothing
}
},
/**
* 快速失败
*/
FAIL_FAST() {
@Override
public void handle(LockInfo lockInfo, Lock lock, JoinPoint joinPoint) {
String errorMsg = String.format("Failed to acquire Lock(%s) with timeout(%ds)", lockInfo.getName(), lockInfo.getWaitTime());
throw new KlockTimeoutException(errorMsg);
}
},
/**
* 一直阻塞,直到获得锁,在太多的尝试后,仍会报错
*/
KEEP_ACQUIRE() {
private static final long DEFAULT_INTERVAL = 100L;
private static final long DEFAULT_MAX_INTERVAL = 3 * 60 * 1000L;
@Override
public void handle(LockInfo lockInfo, Lock lock, JoinPoint joinPoint) {
long interval = DEFAULT_INTERVAL;
while(!lock.acquire()) {
if(interval > DEFAULT_MAX_INTERVAL) {
String errorMsg = String.format("Failed to acquire Lock(%s) after too many times, this may because dead lock occurs.",
lockInfo.getName());
throw new KlockTimeoutException(errorMsg);
}
try {
TimeUnit.MILLISECONDS.sleep(interval);
interval <<= 1;
} catch (InterruptedException e) {
throw new KlockTimeoutException("Failed to acquire Lock", e);
}
}
}
}
}
加锁超时处理策略(LockTimeoutStrategy):
ReleaseTimeoutStrategy.java
package org.springframework.boot.autoconfigure.klock.model;
import org.springframework.boot.autoconfigure.klock.handler.KlockTimeoutException;
import org.springframework.boot.autoconfigure.klock.handler.release.ReleaseTimeoutHandler;
/**
* @author wanglaomo
* @since 2019/4/15
**/
public enum ReleaseTimeoutStrategy implements ReleaseTimeoutHandler {
/**
* 继续执行业务逻辑,不做任何处理
*/
NO_OPERATION() {
@Override
public void handle(LockInfo lockInfo) {
// do nothing
}
},
/**
* 快速失败
*/
FAIL_FAST() {
@Override
public void handle(LockInfo lockInfo) {
String errorMsg = String.format("Found Lock(%s) already been released while lock lease time is %d s", lockInfo.getName(), lockInfo.getLeaseTime());
throw new KlockTimeoutException(errorMsg);
}
}
}
释放锁时超时处理策略(ReleaseTimeoutStrategy):
UserDto.java
package com.alian.redisklock.dto;
import lombok.Data;
@Data
public class UserDto {
private String userId;
private String userName;
private int age;
private String sex;
public UserDto(String userId, String userName, int age, String sex) {
this.userId = userId;
this.userName = userName;
this.age = age;
this.sex = sex;
}
}
KLockService.java
package com.alian.redisklock.service;
import com.alian.redisklock.dto.UserDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.klock.annotation.Klock;
import org.springframework.boot.autoconfigure.klock.model.LockTimeoutStrategy;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class KLockService {
/**
* 单个key取值举例
*
* @param userId
*/
@Klock(keys = "#userId", lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST)
public void singleKey(String userId) {
log.info("单个key接收到的消息:{}", userId);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("单个key业务处理完");
}
/**
* 多个key取值举例
*
* @param userId
* @param userName
*/
@Klock(keys = {"#userId", "#userName"}, lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST)
public void multiKey(String userId, String userName) {
log.info("多个key接收到的消息:{},{}", userId, userName);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("多个key业务处理完");
}
/**
* 对象取值举例
*
* @param userDto
*/
@Klock(keys = {"#userDto.userId"}, lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST)
public void objectKey(UserDto userDto) {
log.info("对象key接收到的消息:{}", userDto);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("对象key业务处理完");
}
/**
* list取值举例
*
* @param list
*/
@Klock(keys = {"#list[0]"}, lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST)
public void listKey(List<String> list) {
log.info("集合list中key接收到的消息:{}", list.toString());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("集合list中key业务处理完");
}
/**
* map取值举例
*
* @param map
*/
@Klock(keys = {"#map['userId']"}, lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST)
public void mapKey(Map<String, String> map) {
log.info("集合Map中key接收到的消息:{}", map);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("集合Map中key业务处理完");
}
@Klock(keys = "#userId", waitTime = 2, customLockTimeoutStrategy = "customLockTimeout")
public void lockTimeOut(String userId) {
log.info("锁超时接收到的消息:{}", userId);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("锁超时业务处理完");
}
private String customLockTimeout(String userId) {
log.info("自定义策略锁超时: {}", userId);
return "自定义策略锁超时";
}
}
如果是熟悉了不同类型的取值,加上组合使用,那么@Klock 注解使用就简单了,实际上参数上都是可以自定义的,如果你配置文件配置了超时,方法上也配置了,方法上的时间优先。比如配置文件加锁超时时间是20秒,业务方法加锁时间是10秒,那么此时的加锁超时时间就是10秒。
@Klock(keys = "#userId", lockType = LockType.Reentrant, waitTime = 10, leaseTime = 10,
lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST, releaseTimeoutStrategy = ReleaseTimeoutStrategy.FAIL_FAST)
public void demo(String userId) {
log.info("业务处理:{}",userId);
}
我这里就不一个个接口贴出结果了,测试的方法只是注释了,大家放开就可以测试了,就简单的验证下,比如我们同一个key,并发5次,就看是不是按线程顺序执行,也就是说哪个线程获取了锁,哪个线程就执行完,下一个线程再执行。
package com.alian.redisklock.service;
import com.alian.redisklock.dto.UserDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Service
public class TestService {
private final CountDownLatch countDownLatch = new CountDownLatch(1);
@Autowired
private KLockService klockService;
@PostConstruct
public void singleKey() {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//单个key
klockService.singleKey("10001");
//多个key
//klockService.multiKey("10001", "ALian");
//对象取值
//klockService.objectKey(new UserDto("10002", "ALian", 18, "男"));
//list取值
//klockService.listKey(Arrays.asList("10001", "10002", "10003"));
//map取值
//klockService.mapKey(new HashMap<String, String>() {{
// put("userId", "ALian");
// put("userName", "ALian");
// put("age", "22");
//}});
//自定义锁超时策略
//klockService.lockTimeOut("10003");
}, "Thread" + i).start();
}
countDownLatch.countDown();
}
}
单个key运行结果:
2021-10-29 16:59:16 671 [Thread2] INFO :单个key接收到的消息:10001
2021-10-29 16:59:18 678 [Thread2] INFO :单个key业务处理完
2021-10-29 16:59:18 691 [Thread1] INFO :单个key接收到的消息:10001
2021-10-29 16:59:20 700 [Thread1] INFO :单个key业务处理完
2021-10-29 16:59:20 713 [Thread4] INFO :单个key接收到的消息:10001
2021-10-29 16:59:22 728 [Thread4] INFO :单个key业务处理完
2021-10-29 16:59:22 742 [Thread0] INFO :单个key接收到的消息:10001
2021-10-29 16:59:24 744 [Thread0] INFO :单个key业务处理完
2021-10-29 16:59:24 769 [Thread3] INFO :单个key接收到的消息:10001
2021-10-29 16:59:26 782 [Thread3] INFO :单个key业务处理完
当然新版本1.4支持了自定义锁策略,我们也得说下啊,这里有几个关键:
比如上面我的锁超时例子如下:
@Klock(keys = "#userId", waitTime = 2, customLockTimeoutStrategy = "customLockTimeout")
public void lockTimeOut(String userId) {
log.info("锁超时接收到的消息:{}", userId);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("锁超时业务处理完");
}
private String customLockTimeout(String userId) {
log.info("自定义策略锁超时: {}", userId);
return "自定义策略锁超时";
}
为此我们写一个类包含变量级加锁和方法级加锁,另外再写一个service(是不是多此一举?结果会不一样的!!!) 进行验证,看看有什么不同。
VariableAndMethodLockService.java
package com.alian.redisklock.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.klock.annotation.Klock;
import org.springframework.boot.autoconfigure.klock.model.LockTimeoutStrategy;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class VariableAndMethodLockService {
/**
* 变量级加锁
* @param userId
*/
@Klock(keys = "#userId", lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST)
public void variableLock(String userId) {
log.info("变量级加锁收到的消息:{}", userId);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("变量级加锁业务处理完:{}", userId);
}
/**
* 方法级加锁
* @param userId
*/
@Klock
public void methodLock(String userId) {
log.info("方法级加锁收到的消息:{}", userId);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("方法级加锁业务处理完:{}", userId);
}
}
同样的我们还是用5个线程来进行并发验证
TestLockTypeService.java
package com.alian.redisklock.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Service
public class TestLockTypeService {
private final CountDownLatch countDownLatch = new CountDownLatch(1);
@Autowired
private VariableAndMethodLockService vmService;
@PostConstruct
public void testVariableAndMethodLock() {
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//变量级加锁
vmService.variableLock("10001_" + finalI);
//方法级加锁
// vmService.methodLock("10002_"+ finalI);
}, "Thread" + i).start();
}
countDownLatch.countDown();
}
}
2021-10-29 17:42:15 432 [Thread0] INFO :变量级加锁收到的消息:10001_0
2021-10-29 17:42:15 432 [Thread1] INFO :变量级加锁收到的消息:10001_1
2021-10-29 17:42:15 432 [Thread4] INFO :变量级加锁收到的消息:10001_4
2021-10-29 17:42:15 432 [Thread2] INFO :变量级加锁收到的消息:10001_2
2021-10-29 17:42:15 432 [Thread3] INFO :变量级加锁收到的消息:10001_3
2021-10-29 17:42:16 439 [Thread3] INFO :变量级加锁业务处理完:10001_3
2021-10-29 17:42:16 439 [Thread2] INFO :变量级加锁业务处理完:10001_2
2021-10-29 17:42:16 439 [Thread0] INFO :变量级加锁业务处理完:10001_0
2021-10-29 17:42:16 439 [Thread4] INFO :变量级加锁业务处理完:10001_4
2021-10-29 17:42:16 439 [Thread1] INFO :变量级加锁业务处理完:10001_1
2021-10-29 17:43:15 779 [Thread3] INFO :方法级加锁收到的消息:10002_3
2021-10-29 17:43:16 787 [Thread3] INFO :方法级加锁业务处理完:10002_3
2021-10-29 17:43:16 797 [Thread2] INFO :方法级加锁收到的消息:10002_2
2021-10-29 17:43:17 800 [Thread2] INFO :方法级加锁业务处理完:10002_2
2021-10-29 17:43:17 819 [Thread0] INFO :方法级加锁收到的消息:10002_0
2021-10-29 17:43:18 833 [Thread0] INFO :方法级加锁业务处理完:10002_0
2021-10-29 17:43:18 844 [Thread1] INFO :方法级加锁收到的消息:10002_1
2021-10-29 17:43:19 847 [Thread1] INFO :方法级加锁业务处理完:10002_1
2021-10-29 17:43:19 863 [Thread4] INFO :方法级加锁收到的消息:10002_4
2021-10-29 17:43:20 864 [Thread4] INFO :方法级加锁业务处理完:10002_4
结论:
也就是说实际中我们要根据自己的需要合理的选用是方法级加锁还是变量级加锁,很明显变量级加锁的效率更适合高并发场景,而方法级的就可能引起阻塞,从上面的结果可以知道本文中例子都是变量级加锁。
举一个例子,比如对不同的商品进行库存的扣减,如果是方法级别的,并发高时可能会导致线程获取不到锁而超时,有些商品卖得多,有些卖得少,那么不同的商品扣减可能就会受到影响;而如果你按照商品编号进行变量级加锁,那么不同的商品的库存扣减就不会受到影响了,因为每个商品之前的线程是独立的,相同条件下,相比方法级加锁也会提供系统的处理性能。也不是不能用方法级加锁,主要看你用于什么场景。
最常见的就比如手机端录入信息到后台,比如注册之类的等等,用户端可能因为各种原因可能会点击多次,导致后台可能会出现多笔记录的情况,这个时候很简单,用到我们的锁,假设,我们是注册用户,手机号是唯一的。
@Klock(keys = "#phone", lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST)
@RequestMapping("register")
public void register(String phone,String nickName) {
log.info("注册账户收到的信息:{},{}", phone,nickName);
try {
//模拟业务过程
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("注册账户业务处理完");
}
这个时候,如果是点击了两次,第一次业务进入获取到锁进行处理,第二过来了也是一个等待,要么第一次处理完成,第二次业务判断已注册,要么第二次直接超时了。
工作中定时任务使用还是蛮多的,但是也会有很多问题,当遇到分布式服务时,一个服务部署多台,定时任务就可能会同时运行,这种情况怎么处理呢?有些人可能会给两个服务的配置改成不一样,比如定时任务的时间修改,一个正常执行,一个在不可能的时间执行,还有人直接给服务设置一个标志位,只有某个标志位的能执行。好吧,在分布式环境,并且服务不是很多的情况下,也许还能勉强维护,那么如果是容器下呢?所以分布式锁的方案就更加重要了。
@Scheduled(cron = "0 0 2 * * ?")
public void dataCollector(){
//开始做任务
String dataTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
createDataFile(dataTime);
//结束任务
}
@Klock(keys = "#dataTime", lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST)
public void createDataFile(String dataTime) {
log.info("开始生成对账文件:{}", dataTime);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("对账文件生成完成:");
}
这是一个示例,实际中,createDataFile方法是另外一个service中的方法,这样不管是单机,分布式多机,还是在容器中都只有一个定时任务在执行,而不会导致重复数据问题。
其实这个场景是用的最多的,比如商品库存的扣减,因为我们不能超卖啊。实际工作中,需要根据自己的业务定义特定意义的key就可以了。其实我之前的实例,基本都是这个模式,就不过多的讲解了。
@Klock(keys = "#goodId", lockTimeoutStrategy = LockTimeoutStrategy.FAIL_FAST)
public void deductCommodityInventory(String goodId,int num) {
log.info("商品【{}】扣减库存:{}", goodId,num);
//扣减库存操作
//dosomething()
log.info("商品扣减库存完成");
}
最后需要提到的是,锁的粒度一定要把握好,不能过小或者过大。