当前位置: 首页 > 工具软件 > redis-mutex > 使用案例 >

Redis-0408(基础篇完结)

吕修伟
2023-12-01

Redis案例-商品秒杀

 public static boolean doSecKill(String uid, String proid)throws IOException {
        //  1.uid和proid判断
        if(uid == null||proid == null){
            return false;
        }

        // 2.连接jedis
        Jedis jedis = new Jedis("192.168.50.105",6379);

        //  3. 拼接key
        String invenKey = "sk:"+proid+"qt";
        String userKey = "sk:"+proid+"qt";

        //  4. 判断是否开始
        String inven = jedis.get(invenKey);
        if(inven == null){
            System.out.println("秒杀尚未开始");
            jedis.close();
            return false;
        }

        //  5.判断是否抢过
        if(jedis.sismember(userKey,uid)){
            System.out.println("不可重复参与!");
            jedis.close();
            return false;
        }

        //  6.判断库存是否充足
        if(Integer.parseInt(jedis.get(invenKey))<=0){
            System.out.println("秒杀已结束,失败");
            jedis.close();
            return false;
        }

        //  进行库存扣除,添加成功人员名单
        jedis.decr(invenKey);

        jedis.sadd(userKey,uid);
				
   			System.out.println("秒杀成功");
        jedis.close();
        return true;
    }

ab工具模拟并发秒杀

# 更新brew
$ brew update

# 安装 APR
$ brew install apr

# 安装 APR-UTIL (费时有些长)
$ brew install apr-util

# 安装 PCRE
$ brew install pcre

#	安装ab工具
$ brew install httpd-tools

# ab常用命令
$ ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded url
# -n:请求数
# -c:并发数
# -k:开启 HTTP Keep-Alive,客户端在请求服务端的资源时,不会关闭与服务端的连接,
# -p:请求参数存放的文件
# -T:设置类型,就固定写: application/x-www-form-urlencoded
# url:替代为请求连接的url

超时和超卖问题解决

解决超时问题:

连接池

  1. 节省每次连接redis服务带来的消耗,把连接好的实例反复利用。
  2. 通过参数管理连接的行为
  3. 链接池参数
    • MaxTotal:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted。
    • maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;
    • MaxWaitMillis:表示当borrow一个jedis实例时,最大的等待毫秒数,如果超过等待时间,则直接抛JedisConnectionException;
    • estOnBorrow:获得一个jedis实例的时候是否检查连接可用性(ping());如果为true,则得到的jedis实例均是可用的;
public class JedisPoolUtil {
	private static volatile JedisPool jedisPool = null;

	private JedisPoolUtil() {
	}

	public static JedisPool getJedisPoolInstance() {
		if (null == jedisPool) {
			synchronized (JedisPoolUtil.class) {
				if (null == jedisPool) {
					JedisPoolConfig poolConfig = new JedisPoolConfig();
					poolConfig.setMaxTotal(200);
					poolConfig.setMaxIdle(32);
					poolConfig.setMaxWaitMillis(100*1000);
					poolConfig.setBlockWhenExhausted(true);
					poolConfig.setTestOnBorrow(true);  // ping  PONG
				 
					jedisPool = new JedisPool(poolConfig, "192.168.50.105", 6379, 60000 );
				}
			}
		}
		return jedisPool;
	}

	public static void release(JedisPool jedisPool, Jedis jedis) {
		if (null != jedis) {
			jedisPool.close();
			jedis.close();
		}
	}

}

解决超卖问题:

public class SecKill {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();

    }

    public static boolean doSecKill(String uid, String proid) throws IOException {
        //  1.uid和proid判断
        if (uid == null || proid == null) {
            return false;
        }

        // 2.连接jedis,使用连接池,避免超时
        JedisPool jedisPool = new JedisPool();
        Jedis jedis = jedisPool.getResource();

        //  3. 拼接key
        String invenKey = "sk:" + proid + "qt";
        String userKey = "sk:" + proid + "qt";

        //  监视库存
        jedis.watch(invenKey);

        //  4. 判断是否开始
        String inven = jedis.get(invenKey);
        if (inven == null) {
            System.out.println("秒杀尚未开始");
            jedis.close();
            return false;
        }

        //  5.判断是否抢过
        if (jedis.sismember(userKey, uid)) {
            System.out.println("不可重复参与!");
            jedis.close();
            return false;
        }

        //  6.判断库存是否充足
        if (Integer.parseInt(jedis.get(invenKey)) <= 0) {
            System.out.println("秒杀已结束,失败");
            jedis.close();
            return false;
        }

        //  multi事务操作
        Transaction multi = jedis.multi();
        multi.decr(invenKey);
        multi.sadd(userKey, uid);

        List<Object> result = multi.exec();
        if (result == null || result.size() == 0) {
            System.out.println("秒杀失败");
            jedis.close();
            return false;
        }

        System.out.println("秒杀成功");
        jedis.close();
        return true;
    }
}

乐观锁造成的库存遗留问题

redis默认不能直接使用悲观锁,使用LUA脚本语言操作。

public class SecKill_redisByScript {
	
	private static final  org.slf4j.Logger logger =LoggerFactory.getLogger(SecKill_redisByScript.class) ;

	public static void main(String[] args) {
		JedisPool jedispool =  JedisPoolUtil.getJedisPoolInstance();
 
		Jedis jedis=jedispool.getResource();
		System.out.println(jedis.ping());
		
		Set<HostAndPort> set=new HashSet<HostAndPort>();

		// doSecKill("201","sk:0101");
	}
	
	static String secKillScript ="local userid=KEYS[1];\r\n" + 
			"local prodid=KEYS[2];\r\n" + 
			"local qtkey='sk:'..prodid..\":qt\";\r\n" + 
			"local usersKey='sk:'..prodid..\":usr\";\r\n" + 
			"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" + 
			"if tonumber(userExists)==1 then \r\n" + 
			"   return 2;\r\n" + 
			"end\r\n" + 
			"local num= redis.call(\"get\" ,qtkey);\r\n" + 
			"if tonumber(num)<=0 then \r\n" + 
			"   return 0;\r\n" + 
			"else \r\n" + 
			"   redis.call(\"decr\",qtkey);\r\n" + 
			"   redis.call(\"sadd\",usersKey,userid);\r\n" + 
			"end\r\n" + 
			"return 1" ;
			 
	static String secKillScript2 = 
			"local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" +
			" return 1";

	public static boolean doSecKill(String uid,String prodid) throws IOException {

		JedisPool jedispool =  JedisPoolUtil.getJedisPoolInstance();
		Jedis jedis=jedispool.getResource();

		 //String sha1=  .secKillScript;
		String sha1=  jedis.scriptLoad(secKillScript);
		Object result= jedis.evalsha(sha1, 2, uid,prodid);

		  String reString=String.valueOf(result);
		if ("0".equals( reString )  ) {
			System.err.println("已抢空!!");
		}else if("1".equals( reString )  )  {
			System.out.println("抢购成功!!!!");
		}else if("2".equals( reString )  )  {
			System.err.println("该用户已抢过!!");
		}else{
			System.err.println("抢购异常!!");
		}
		jedis.close();
		return true;
	}
}

Redis持久化

Redis 提供了2个不同形式的持久化方式。

  • RDB(Redis DataBase)
  • AOF(Append Of File)

RDB方式

  1. 简介:
    • 在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里
  2. 备份如何执行:
    • Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失(原因是可能会因为数据库服务器在时间间隔到达前挂掉,导致最后添加的数据没有备份)。
  3. Fork:
    • Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程
    • 在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了“写时复制技术
    • 在Redis的持久化操作中,写时复制技术主要是用于RDB持久化方式下的操作。当Redis进行RDB快照持久化时,会将当前内存中的数据快照到磁盘中,如果此时Redis仍然有写操作,那么就需要使用写时复制技术来保证数据的一致性。
    • 一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。
    • 总之,父进程写,子进程负责复制,在复制之前,先建立一个新的临时文件,临时文件的内容完全复制过去之后,才去替换dump.rdb文件。
    • ==写时复制技术:==多个调用者同时请求相同资源,他们会共享相同的资源,直到某个调用者试图修改内容时,系统才会复制一份副本给该调用者,而其他调用者所见到的最初资源仍然保持不变。所以在这里,如果父进程企图写,则会给父进程开辟一个新空间,写完再去合并,如果子进程写磁盘,也会有一个新空间开辟,写完之后再合并。
  4. 配置:
    • 文件名称:dbfilename dump.rdb
    • 配置位置:dir ./
    • 如何触发RDB快照:保持策略
    • save命令:save 3600 1 :每隔3600秒有一个修改就会持久化一下
    • bgsave命令:Redis 会在后台异步进行快照操作, 快照同时还可以响应客户端请求。
    • 可以通过lastsave 命令获取最后一次成功执行快照的时间
    • stop-writes-on-bgsave-error:当Redis无法写入磁盘的话,直接关掉Redis的写操作。推荐yes.
    • rdbcompression:在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能。推荐yes.
  5. 持久化更新机制
    • 通过实验得知:redis通过启动到修改的计时以及修改次数去持久化。
    • 什么时候进行修改:时间间隔达到时才会修改。
    • 可能会出现达到修改条件后立刻执行的修改,原因:因为从启动服务或上次持久化操作,计时已经超过时间下限,但是修改次数还未超过,例如,我设置save 20 3然后启动redis之后40s左右时进行了3次修改,dump文件会立刻出现,因为40s时间早已达到20s的范围,然后立刻进行了三次修改,也满足了条件,所以立刻进行了持久化。而后我又紧跟着3次修改,之后立刻复制dump.rdb文件,关停redis服务,之后删除原dump文件,将我复制的dump备份文件更名为dump.rdb,之后发现其只保存了3次修改,紧随其后的3次修改需要再等20s才能进行持久化。
  6. rdb的备份:
    1. 先通过config get dir 查询rdb文件的目录
    2. 将*.rdb的文件拷贝到别的地方
    3. rdb的恢复
      • 关闭Redis
      • 先把备份的文件拷贝到工作目录下 cp dump2.rdb dump.rdb
      • 启动Redis, 备份数据会直接加载
  7. 优势:
    1. 适合大规模的数据恢复
    2. 对数据完整性和一致性要求不高更适合使用
    3. 节省磁盘空间
    4. 恢复速度快
  8. 劣势:
    1. Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
    2. 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
    3. 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。
  9. 总结:
    1. RDB是一个非常紧凑的文件
    2. RDB在保持RDB文件时父进程唯一需要做的就是fork一个子进程,接下来的全部工作由子进程来做,父进程不需要再做其他IO操作,所以RDB持久化方式可以最大化redis的性能。
    3. 与AOF相比,在恢复大的数据集的时候,RDB更快一些。
    4. 数据丢失风险大
    5. RDB需要经常fork子进程来保存数据集到硬盘上,当数据集比较大的时候,fork的过程非常耗时,可能会导致redis在一下毫秒级不能响应客户端请求。

AOF方式

  1. 概念:
    • 日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
  2. 持久化流程
    • 客户端的请求写命令会被append追加到AOF缓冲区内;
    • AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;
    • AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
    • Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;
  3. AOF开启配置:
    • AOF默认不开启
    • 修改默认的appendonly no,改为yes
    • 可以在redis.conf中配置文件名称,默认为 appendonly.aof
  4. AOF和RDB同时开启,redis听谁的?
    • AOF和RDB同时开启,系统默认取AOF的数据(数据不会存在丢失)
  5. 异常恢复:
    • aof文件受损失,比如你打开aof文件,在里面随便打字,会造成损坏。
    • redis-check-aof --fix appendonly.aof命令可以进行文件修复
  6. AOF同步频率设置
    • appendfsync always:始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好
    • appendfsync everysec:每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。
    • appendfsync no : redis不主动进行同步,把同步时机交给操作系统。
  7. 重写压缩
    • 概念:redis会把操作指令进行压缩,不考虑中间步骤,只保留生成最终结果的指令。
    • 条件:aof文件达到128MB的时候触发
    • 流程:fork子进程,开辟新空间进行写,完后替换原文件,就是写时复制技术
  8. 优势:
    • 备份机制更稳健,丢失数据概率更低。
    • 可读的日志文本,通过操作AOF稳健,可以处理误操作。
  9. 劣势:
    • 比起RDB占用更多的磁盘空间。
    • 恢复备份速度要慢。
    • 每次读写都同步的话,有一定的性能压力。
    • 存在个别Bug,造成恢复不能。
  10. 总结:
    • AOF文件是一个只追加日志的文件
    • Redis可以在AOF文件体积变得过大时,自动在后台对其进行重写操作
    • AOF文件有序的保存了对数据库执行的所有==写入==操作,这些写入操作以redis协议的格式保存,因此AOF文件内容非常容易被人读懂。
    • 对于相同的数据集来说,AOF文件的体积通常要大于RDB文件。
    • 根据使用的fsync策略,AOF的速度可能慢于RDB

二者总结

  1. 用哪个好
    • 官方推荐两个都启用。
    • 如果对数据不敏感,可以选单独用RDB。
    • 不建议单独用 AOF,因为可能会出现Bug。
    • 如果只是做纯内存缓存,可以都不用。

Redis主从复制

  1. 概念:
    • 主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主
  2. 作用:
    • 读写分离,性能扩展
    • 容灾快速恢复:当一个从服务器宕机之后,可以请求其他从服务器
  3. 操作:
    • 创建三个配置文件,使用include方式引入公共部分,配置各自端口
    • 可以使用info replication打印自己信息
    • 使用slaveof <ip><port>成为主机的从机
  4. 复制原理:
    1. 从机连接上之后,向主机发送同步数据请求
    2. 主机接收到同步请求之后,持久化操作,把rdb文件发给从机(全量复制
    3. 之后主机再有新信息,依次传递给从机(增量复制
    4. 只要重新连接主机,一次性全量复制
  5. 一主二从:
    1. 即使主机宕机,从机依然认主机为大哥,等待大哥复活。
    2. 从机不可以set
    3. 一个从机宕机的话,再次启动不会自动成为从机,而是会成为主机,还需要slaveof操作才可以成为主机,并且数据本身不会同步,需要成为从机之后,会自动同步。
  6. 薪火相传:
    1. 从服务器下面也可以再挂从服务器
    2. 特点:延续一主二从的特点
  7. 反客为主:
    1. 如果主服务器挂掉,然后从服务器晋升为主服务器的过程。
    2. 命令:slaveof no one将从机变成主机
  8. 哨兵模式(反客为主的自动版)
    1. 解释:后台自动监控主机是否故障,然后后台进行投票将从机晋升为主机。
    2. 操作:
      1. myredis目录下创建sentinel.conf配置文件
        • sentinel monitor mymaster 127.0.0.1 6379 1
        • 其中mymaster为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量。
      2. 启动哨兵:redis-sentinel /myredis/sentinel.conf
      3. 新主登基:
        1. 选择优先级小的:replica- priority
        2. 如果上一层相等,选择偏移量最大的,也就是数据最新的
        3. 如果上一层也一样,选择runid最小的,runid是运行时随机生成的40位id
      4. 群臣俯首:
        1. 挑选完新主之后,sentienl向原主的从服务器发送salveof的新命令,复制新master的数据
      5. 旧主俯首:
        1. 已下线的主机重新上线时,sentienl会向其发送slaveof操作,让其服从新主
      6. 复制延时:
        • 由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。
      7. Java实现:
        • private static JedisSentinelPool jedisSentinelPool=null;
          
          public static  Jedis getJedisFromSentinel(){
          	if(jedisSentinelPool==null){
                      Set<String> sentinelSet=new HashSet<>();
                      sentinelSet.add("192.168.11.103:26379");
          
                      JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
                      jedisPoolConfig.setMaxTotal(10); //最大可用连接数
          						jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
          						jedisPoolConfig.setMinIdle(5); //最小闲置连接数
          						jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
          						jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
          						jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
          
          						jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
          						return jedisSentinelPool.getResource();
          	}else{
          					return jedisSentinelPool.getResource();
            }
          }
          

Redis集群

Redis应用问题解决

缓存穿透

  1. 现象:
    1. 应用服务器压力突然变大,大量请求达到
    2. redis命中率降低
    3. 一直查询数据库,数据库一直承受访问压力,导致崩溃
  2. 原因现象:
    1. redis查询不到数据
    2. 出现很多非正常url访问(一般因为遭受到恶意攻击)
  3. 解决方案:
    1. **对空值缓存:**如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
    2. **设置可访问的名单(白名单):**使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
    3. 采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
    4. **进行实时监控:**当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

缓存击穿

  1. 解释:key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
  2. 现象:
    1. 数据库访问压力瞬间增大,导致崩溃
    2. redis里并没有出现大量key过期
    3. redis正常运行
  3. 原因现象:
    1. redis某个热门搜索key过期,大量访问中都使用这个key
  4. 解决方案:
    0. key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。
    1. **预先设置热门数据:**在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长
    2. **实时调整:**现场监控哪些数据热门,实时调整key的过期时长
    3. 使用锁:
      1. 就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。
      2. 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key
      3. 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;
      4. 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。

缓存雪崩

  1. 描述:
    • key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大量并发的请求可能会瞬间把后端DB压垮。
    • 缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key正常访问
  2. 现象:
    1. 数据库压力变大,导致服务器、应用、redis全部崩溃
  3. 原因:
    1. 在极少的时间段内,查询大量key的集中过期情况
  4. 解决方案:
    1. **构建多级缓存架构:**nginx缓存 + redis缓存 +其他缓存(ehcache等)
    2. **使用锁或队列:**用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
    3. **设置过期标志更新缓存:**记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
    4. **将缓存失效时间分散开:**比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

分布式锁

  1. 描述:
    • 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
  2. 主流解决方案:
    1. 基于数据库实现分布式锁
    2. 基于缓存(Redis等)
    3. 基于Zookeeper
  3. 对比各自优缺点:
    1. 性能:redis最高
    2. 可靠性:zookeeper最高
  4. 基于redis实现分布式锁
    1. 上锁:setnx key value 只有在key为空时才可以赋值,所以设置值的同时上了锁
    2. 解锁:del key
    3. 超时机制:set key value nx ex time 设置值的同时设置了超时时间,ex后面的值就是超时时间
  5. Java代码:
    • @GetMapping("testLock")
          public void testLock() {
              //1获取锁,setne , 同时设置3秒过期,以避免中间出现异常,导致锁一直无法释放
              Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS);
              //2获取锁成功、查询num的值
              if (lock) {
                  Object value = redisTemplate.opsForValue().get("num");
                  //2.1判断num为空return
                  if (StringUtils.isEmpty(value)) {
                      return;
                  }
                  //2.2有值就转成成int
                  int num = Integer.parseInt(value + "");
                  //2.3把redis的num加1
                  redisTemplate.opsForValue().set("num", ++num);
                  //2.4释放锁,del
                  redisTemplate.delete("lock");
      
              } else {
                  //3获取锁失败、每隔0.1秒再获取
                  try {
                      Thread.sleep(100);
                      testLock();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
      
    • 对于if段执行如果出现异常导致锁没有被释放的问题:设置超时自动释放
  6. 对于释放错锁,和释放别人锁的问题:
    • 描述:a上锁之后遇到卡顿,导致锁超时自动释放;之后b抢到锁进行操作;再之后a的服务器恢复,a操作完成之后释放了b的锁,导致接下来全部错位释放锁。c会抢到b的锁。
    • 解决:
      • UUID防止误删除:可以防止误删除链,但是依然存在超时误删的问题
        1. 第一步:set lock uuid nx ex 10,然后用uuid进行判断。
        2. 第二步:释放锁时候,判断当前uuid和锁里面uuid是否一致,一致则释放,不一致则不管
        3. 代码改造:
          String uuid = UUID.randomUUID().toString();
          
          //1获取锁,setne  ,同时设置3秒过期,以避免中间出现异常,导致锁一直无法释放
          Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
          
          ...
            
          //2.4释放锁,del
          String lockUuid = (String)redisTemplate.opsForValue().get("lock");
          if(uuid.equals(lockUuid)){
             redisTemplate.delete("lock");
           }
          
        4. 遗留问题:当比较uuid结果相等的时候,锁过期了,又会导致误删锁
      • LUA保证删除原子性操作:
        1. @GetMapping("testLockLua")
              public void testLockLua() {
                  //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
                  String uuid = UUID.randomUUID().toString();
                  //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
                  String skuId = "25"; // 访问skuId 为25号的商品 100008348542
                  String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
          
                  // 3 获取锁
                  Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
          
                  // 第一种: lock 与过期时间中间不写任何的代码。
                  // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
                  // 如果true
                  if (lock) {
                      // 执行的业务逻辑开始
                      // 获取缓存中的num 数据
                      Object value = redisTemplate.opsForValue().get("num");
                      // 如果是空直接返回
                      if (StringUtils.isEmpty(value)) {
                          return;
                      }
                      // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
                      int num = Integer.parseInt(value + "");
                      // 使num 每次+1 放入缓存
                      redisTemplate.opsForValue().set("num", String.valueOf(++num));
                      /*使用lua脚本来锁*/
                      // 定义lua 脚本
                      String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                      // 使用redis执行lua执行
                      DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
                      redisScript.setScriptText(script);
                      // 设置一下返回值类型 为Long
                      // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
                      // 那么返回字符串与0 会有发生错误。
                      redisScript.setResultType(Long.class);
                      // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
                      redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
                  } else {
                      // 其他线程等待
                      try {
                          // 睡眠
                          Thread.sleep(1000);
                          // 睡醒了之后,调用方法。
                          testLockLua();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              }
          
  7. 分布式锁四个条件:
    1. 互斥性。在任意时刻,只有一个客户端能持有锁。
    2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
    3. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
    4. 加锁和解锁必须具有原子性。

Redis6新功能

ACL(权限控制)

  1. 简介
    • Redis ACL是Access Control List(访问控制列表)的缩写,该功能允许根据可以执行的命令和可以访问的键来限制某些连接。
    • 在Redis 5版本之前,Redis 安全规则只有密码控制 还有通过rename 来调整高危命令比如 flushdb , KEYS* , shutdown 等。Redis 6 则提供ACL的功能对用户进行更细粒度的权限控制 :
      1. 接入权限:用户名和密码
      2. 可以执行的命令
      3. 可以操作的 KEY
      4. 参考官网:https://redis.io/topics/acl

IO多线程

  1. 简介
    • Redis6终于支撑多线程了,告别单线程了吗?
    • IO多线程其实指客户端交互部分网络IO交互处理模块多线程,而非执行命令多线程。Redis6执行命令依然是单线程。

工具支持 Cluster

之前老版Redis想要搭集群需要单独安装ruby环境,Redis 5 将 redis-trib.rb 的功能集成到 redis-cli 。另外官方 redis-benchmark 工具开始支持 cluster 模式了,通过多线程的方式对多个分片进行压测。

Redis新功能持续关注

Redis6新功能还有:

1、RESP3新的 Redis 通信协议:优化服务端与客户端之间通信

2、Client side caching客户端缓存:基于 RESP3 协议实现的客户端缓存功能。为了进一步提升缓存的性能,将客户端经常访问的数据cache到客户端。减少TCP网络交互。

3、Proxy集群代理模式:Proxy 功能,让 Cluster 拥有像单实例一样的接入方式,降低大家使用cluster的门槛。不过需要注意的是代理不改变 Cluster 的功能限制,不支持的命令还是不会支持,比如跨 slot 的多Key操作。

4、Modules API

Redis 6中模块API开发进展非常大,因为Redis Labs为了开发复杂的功能,从一开始就用上Redis模块。Redis可以变成一个框架,利用Modules来构建不同系统,而不需要从头开始写然后还要BSD许可。Redis一开始就是一个向编写各种系统开放的平台。

 类似资料: