前言
最近在做分块上传的业务,使用到了Redis来维护上传过程中的分块编号。
每上传完成一个分块就获取一下文件的分块集合,加入新上传的编号,手动接口测试下是没有问题的,前端通过并发上传调用就出现问题了,并发的get再set,就会存在覆盖写现象,导致最后的分块数据不对,不能触发分块合并请求。
遇到并发二话不说先上锁,针对执行代码块加了一个JVM锁之后问题就解决了。
仔细一想还是不太对,项目是分布式部署的,做了负载均衡,一个节点的代码被锁住了,请求轮询到其他节点还是可以进行覆盖写,并没有解决到问题啊
没办法,只有用上分布式锁了。之前对于分布式锁的理论还是很熟悉的,没有比较好的应用场景就没写过具体代码,趁这个机会就学习使用一下分布式锁。
理论
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。是为了解决分布式系统中,不同的系统或是同一个系统的不同主机共享同一个资源的问题,它通常会采用互斥来保证程序的一致性
通常的实现方式有三种:
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
本文就使用的是Redis的setnx实现,如果Redis是多机版的可以去了解下Radssion,封装的就特别的好,也是官方推荐的
代码
1. 加依赖
引入Spring Boot和Redis整合的快速使用依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2. 加配置
application.properties中加入Redis连接相关配置
spring.redis.host=xxx spring.redis.port=6379 spring.redis.database=0 spring.redis.password=xxx spring.redis.timeout=10000 # 设置jedis连接池 spring.redis.jedis.pool.max-active=50 spring.redis.jedis.pool.min-idle=20
3. 重写Redis的序列化规则
默认使用的JDK的序列化,不自己设置一下Redis中的数据是看不懂的
/** * @author Chkl * @create 2020/6/7 * @since 1.0.0 */ @Component public class RedisConfig { /** * 改造RedisTemplate,重写序列化规则,避免存入序列化内容看不懂 * @param connectionFactory * @return */ @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(connectionFactory); // 设置key和value的序列化规则 redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer(Object.class)); redisTemplate.setKeySerializer(new StringRedisSerializer()); return redisTemplate; } }
4. 如何正确的上锁
直接上代码
@Component public class RedisLock { @Autowired private StringRedisTemplate redisTemplate; private long timeout = 3000; /** * 上锁 * @param key 锁标识 * @param value 线程标识 * @return 上锁状态 */ public boolean lock(String key, String value) { long start = System.currentTimeMillis(); while (true) { //检测是否超时 if (System.currentTimeMillis() - start > timeout) { return false; } //执行set命令 Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);//1 //是否成功获取锁 if (absent) { return true; } return false; } } }
核心代码就是
Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
setIfAbsent方法就相当于命令行下的Setnx方法,指定的 key 不存在时,为 key 设置指定的值
参数分别是key、value、超时时间和时间单位
5. 如何正确解锁
@Component public class RedisLock { @Autowired private StringRedisTemplate redisTemplate; @Autowired private DefaultRedisScript<Long> redisScript; private static final Long RELEASE_SUCCESS = 1L; /** * 解锁 * @param key 锁标识 * @param value 线程标识 * @return 解锁状态 */ public boolean unlock(String key, String value) { //使用Lua脚本:先判断是否是自己设置的锁,再执行删除 Long result = redisTemplate.execute(redisScript, Arrays.asList(key,value)); //返回最终结果 return RELEASE_SUCCESS.equals(result); } /** * @return lua脚本 */ @Bean public DefaultRedisScript<Long> defaultRedisScript() { DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText("if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end"); return defaultRedisScript; } }
解锁过程需要两步操作
1.判断操作线程是否是加锁的线程
2.如果是加锁线程,执行解锁操作
这两步操作也需要原子的进行操作,但是Redis不支持这两步的合并的操作,所以,就只有使用lua脚本实现来保证原子性咯
如果在判断是加锁的线程之后,并且执行解锁之前,锁到期了,被其他线程获得锁了,这时候再进行解锁就会解掉其他线程的锁,使得不满足解铃还须系铃人
6. 实际应用
没有使用分布式锁时的保存文件分块的代码
/** * 保存文件分块编号到redis * @param chunkNumber 分块号 * @param identifier 文件唯一编号 * @return 文件分块的大小 */ @Override public Integer saveChunk(Integer chunkNumber, String identifier) { //从Redis获取已经存在的分块编号集合 Set<Integer> oldChunkNumber = (Set<Integer>) JSON.parseObject(redisOperator.get("chunkNumberList_"+identifier),Set.class); //如果不存在分块集合,创建一个集合 if (Objects.isNull(oldChunkNumber)) { Set<Integer> newChunkNumber = new HashSet<>(); newChunkNumber.add(chunkNumber); redisOperator.set("chunkNumberList_"+identifier, JSON.toJSONString(newChunkNumber),36000); return newChunkNumber.size(); //如果分块集合已经存在了,就添加一个编号 } else { oldChunkNumber.add(chunkNumber); redisOperator.set("chunkNumberList_"+identifier, JSON.toJSONString(oldChunkNumber),36000); return oldChunkNumber.size(); } }
存在的问题是:当并发的请求进来之后,可能获取同一个状态的集合进行修改,修改后直接写入,造成同一个状态获得的集合操作线程覆盖写的现象
使用分布式锁保证同时只能有一个线程能获取到集合并进行修改,避免了覆盖写现象
使用分布式锁代码
/** * 保存文件分块编号到redis * @param chunkNumber 分块号 * @param identifier 文件唯一编号 * @return 文件分块的大小 */ @Override public Integer saveChunk(Integer chunkNumber, String identifier) { //通过UUID生成一个请求线程识别标志作为锁的value String threadUUID = CoreUtil.getUUID(); //上锁,以共享资源标识:文件唯一编号,作为key,以线程标识UUID作为value redisLock.lock(identifier,threadUUID); //从Redis获取已经存在的分块编号集合 Set<Integer> oldChunkNumber = (Set<Integer>) JSON.parseObject(redisOperator.get("chunkNumberList_"+identifier),Set.class); //如果不存在分块集合,创建一个集合 if (Objects.isNull(oldChunkNumber)) { Set<Integer> newChunkNumber = new HashSet<>(); newChunkNumber.add(chunkNumber); redisOperator.set("chunkNumberList_"+identifier, JSON.toJSONString(newChunkNumber),36000); //解锁 redisLock.unlock(identifier,threadUUID); return newChunkNumber.size(); //如果分块集合已经存在了,就添加一个编号 } else { oldChunkNumber.add(chunkNumber); redisOperator.set("chunkNumberList_"+identifier, JSON.toJSONString(oldChunkNumber),36000); //解锁 redisLock.unlock(identifier,threadUUID); return oldChunkNumber.size(); } }
代码中使用的共享资源标识是文件唯一编号identifier,它能标识加锁代码段中的唯一资源,即key为"chunkNumberList_"+identifier的集合
代码中使用的线程唯一标识是UUID,能保证加锁和解锁时获取的标识不会重复
到此这篇关于SpringBoot整合Redis正确的实现分布式锁的示例代码的文章就介绍到这了,更多相关SpringBoot整合Redis分布式锁内容请搜索小牛知识库以前的文章或继续浏览下面的相关文章希望大家以后多多支持小牛知识库!
本文向大家介绍springboot整合Mybatis、JPA、Redis的示例代码,包括了springboot整合Mybatis、JPA、Redis的示例代码的使用技巧和注意事项,需要的朋友参考一下 引言 在springboot 项目中,我们是用ORM 框架来操作数据库变的非常方便。下面我们分别整合mysql ,spring data jpa 以及redis 。让我们感受下快车道。 我们首先创建一
背景 在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等。大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。 一、使用分布式锁要满足的几个条件: 系统是一个分布式系统(关键是分布式,单机的可以
本文向大家介绍SpringBoot集成Redisson实现分布式锁的方法示例,包括了SpringBoot集成Redisson实现分布式锁的方法示例的使用技巧和注意事项,需要的朋友参考一下 上篇 《SpringBoot 集成 redis 分布式锁优化》对死锁的问题进行了优化,今天介绍的是 redis 官方推荐使用的 Redisson ,Redisson 架设在 redis 基础上的 Java 驻内存
本文向大家介绍Java编程redisson实现分布式锁代码示例,包括了Java编程redisson实现分布式锁代码示例的使用技巧和注意事项,需要的朋友参考一下 最近由于工作很忙,很长时间没有更新博客了,今天为大家带来一篇有关Redisson实现分布式锁的文章,好了,不多说了,直接进入主题。 1. 可重入锁(Reentrant Lock) Redisson的分布式可重入锁RLock Java对象实现
本文向大家介绍Redis实现分布式锁和等待序列的方法示例,包括了Redis实现分布式锁和等待序列的方法示例的使用技巧和注意事项,需要的朋友参考一下 在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 ReentrankLock 这些锁的作用范围都是 JVM ,说白了在集群下没啥用。这时我们就需要能在多台 JVM 之间决定执行顺序的锁了
本文向大家介绍SpringBoot整合UEditor的示例代码,包括了SpringBoot整合UEditor的示例代码的使用技巧和注意事项,需要的朋友参考一下 当前开发项目涉及到富文本框,了解了不少富文本编辑器之后,最终决定使用度娘的UEditor。原因:功能强大,并且自带适配java后端的图片和视频上传。 项目地址 不多说,上一下该项目的地址: http://ueditor.baidu.com/