当前位置: 首页 > 工具软件 > redis-mutex > 使用案例 >

Redis基础知识

冯鸿哲
2023-12-01

Redis基础知识

数据类型

字符串String、字典Hash、列表List、集合Set、有序集合SortedSet
#参考链接

雪崩

定义:同一时间大面积的key失效或者 Redis 故障宕机
解决方法:设置有效时间加上随机值

穿透

定义:访问redis和db不存在的数据
解决方法:加校验,也可使用布隆管理器(Bloom Filter)

击穿

定义:一个热点数据失效时间大量数据直接访问db
解决方法:设置热点数据不过期或者加上互斥锁

Redis分布式锁

先拿setnx来争抢锁,抢到之后,再用expire给锁加一个过期时间防止锁忘记了释放。

127.0.0.1:6379> setnx looi 999
(integer) 1
127.0.0.1:6379> expire looi 10
(integer) 1
127.0.0.1:6379> get looi
"999"
127.0.0.1:6379> ttl looi
(integer) -2


Redis的setnx命令是当key不存在时设置key,但setnx不能同时完成expire设置失效时长,不能保证setnx和expire的原子性。我们可以使用set命令完成setnx和expire的操作,并且这种操作是原子操作。

127.0.0.1:6379> set looi 996 ex 100 nx
OK
127.0.0.1:6379> get looi
"996"
127.0.0.1:6379> ttl looi
(integer) 94

代码中操作

$redis->set('testyy',9909,['EX'=>60,'NX']);

tp中

$handler=$redis->handler();(使用 handler 方法)
$handler->set('testyy',9909,['EX'=>60,'NX']);

解锁

解锁的逻辑更加简单,就是一段Lua的拼装,把Key做了删除。
LUA是原子性的,也比较简单,就是判断一下Key和我们参数是否相等,是的话就删除,返回成功1,0就是失败。

EVAL script numkeys key [key …] arg [arg …]

  • script: 参数是一段 Lua 5.1 脚本程序。脚本不必(也不应该)定义为一个 Lua 函数。
  • numkeys: 用于指定键名参数的个数。
  • key [key …]: 从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推)。
  • arg [arg …]: 附加参数,在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)
$script='
     if redis.call("get",KEYS[1]) == ARGV[1] then
          return redis.call("del",KEYS[1])
     else
          return 0
     end
     ';
     $redis->eval($script,[$lockKey,$randValue],1);
                

Redis异步消息队列

说道消息队列,你肯定会想到Kafka、Rabbitmq等消息中间件,这些专业的消息中间件提供了很多功能特性,当然他的部署使用维护都是比较麻烦的。如果你对消息队列没那么高要求,想要轻量级的,使用Redis就没错啦。

Redis通过list数据结构来实现消息队列.主要使用到如下命令:

  • lpush和rpush入队列

  • lpop和rpop出队列

  • blpop和brpop阻塞式出队列

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
 
//发送消息
$redis->lPush($list, $value);
 
//消费消息
while (true) {
    try {
        $msg = $redis->rPop($list);
        if (!$msg) {
            sleep(1);
        }
        //业务处理
     
    } catch (Exception $e) {
        echo $e->getMessage();
    }
}

上面代码会有个问题如果队列长时间是空的,那pop就不会不断的循环,这样会导致redis的QPS升高,影响性能。所以我们使用sleep来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是sleep会导致消息的处理延迟增加。这个问题我们可以通过blpop/brpop 来阻塞读取队列。

blpop/brpop在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。

还有一个需要注意的点是我们需要是用try/catch来进行异常捕获,如果一直阻塞在那里,Redis服务器一般会主动断开掉空链接,来减少闲置资源的占用。

$list = $redis->brPop('communal_app_log_list', 2);

一对多的情况使用订阅(pub/sub)

延时队列

你是否在做电商项目的时候会遇到如下场景:

订单下单后超过一小时用户未支付,需要关闭订单

订单的评论如果7天未评价,系统需要自动产生一条评论

这个时候我们就需要用到延时队列了,顾名思义就是需要延迟一段时间后执行。Redis可通过zset来实现。我们可以将有序集合的value设置为我们的消息任务,把value的score设置为消息的到期时间,然后轮询获取有序集合的中的到期消息进行处理。

实现代码如下:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
 
$redis->zAdd($delayQueue,$tts, $value);
 
while(true) {
    try{
        $msg = $redis->zRangeByScore($delayQueue,0,time(),0,1);
        //删除消息
        $ok = $redis.zrem($delayQueue,$msg);
        if($ok){
            //业务处理
        }
    } catch(\Exception $e) {
 
    }
}

这里又产生了一个问题,同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。解决办法:将 zrangebyscore和zrem使用lua脚本进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了

消息队列(streams)

  • 生产者
<?php
namespace app\index\controller;

use think\Controller;
use think\cache\driver\Redis;
//use think\session\driver\Redis;
use think\Db;

class Index extends Controller
{
    public function index()
    {
    		//lua脚本
    		$script='
             if redis.call("get",KEYS[1]) > "0"  then
                  return redis.call("DECR",KEYS[1]);
                   end         
                ';
                
            $res= $redis->eval($script,['name'],1);
            if($res || $res===0){
                $a=$redis->get('num');
                $a=$a+1;
                $redis->set('num',$a);
                $name="duzhili".$a;
                $data=[
                'name'=>$name
                ];
                $db=Db::table('test')->insertGetId($data);
                if($db){
                	$redis->xadd('mytss','*',['name'=>$name]);
				}
                
            }
    }
  • 消费者
<?php
namespace app\index\controller;

use think\Controller;
use think\cache\driver\Redis;
//use think\session\driver\Redis;
use think\Db;
use  think\worker\Server;
use Workerman\Lib\Timer;

class Logistics extends Server{

     protected $socket = 'websocket://0.0.0.0:8888';
	 public function onWorkerStart($scoket){
                $time=2;
                Timer::add($time,function(){
                
	                $redis=new Redis();
	                
	                $a= $redis->rawCommand('xreadgroup','group','onets','oneo','count','1','streams','mytss','>');
	               
	                if($a){
	                     $Id=$a[0][1][0][0];
	                     $username=$a[0][1][0][1][1];
	                     $data=[
	                     'username'=>$username
	                     ];
	
	                  $res=Db::table('one')->insert($data);
	                  if($res){
	                     $redis->rawCommand('xack','mytss','onets',$Id);
	                  }
	                }
                });
        }



自我理解(消息队列是在消息的传输过程中保存消息的容器,然后通过定时器去执行下一步逻辑)

数据一致性

最经典的缓存+数据库读写的模式,就是 Cache Aside Pattern

  • 读的时候,先读缓存,缓存没有的话,就读数据库,然后取出数据后放入缓存,同时返回响应。
  • 更新的时候,先更新数据库,然后再删除缓存。

内存淘汰策略

在Redis的redis.conf配置文件中,列出了8种策略:
(1)volatile-lru:从已设置过期时间的数据集中挑选最近最少使用的数据淘汰。
(2)volatile-ttl:从已设置过期时间的数据集中挑选将要过期的数据淘汰。
(3)volatile-random:从已设置过期时间的数据集中任意选择数据淘汰。
(4)volatile-lfu:从已设置过期时间的数据集挑选使用频率最低的数据淘汰。
(5)allkeys-lru:从数据集中挑选最近最少使用的数据淘汰
(6)allkeys-lfu:从数据集中挑选使用频率最低的数据淘汰。
(7)allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰
(8) no-enviction(驱逐):禁止驱逐数据,这也是默认策略。意思是当内存不足以容纳新入数据时,新写入操作就会报错,请求可以继续进行,线上任务也不能持续进行,采用no-enviction策略可以保证数据不被丢失。

这八种大体上可以分为4中,lru、lfu、random、ttl。

在集群模式下,Redis的key是如何寻址的?

#答案

互斥锁

//1、使用互斥锁
function get_redis_cache($ptid) {
    
    $ptid_key = GG_LIST_KEY . "_" . $ptid;
    $ptid_mutex = GG_KEY_MUTEX . "_" . $ptid;
    $value_redis = "";
//    $return = array();
    if(class_exists('Redis')) {
        $redis = new Redis();
        $res = $redis->connect(REDIS_HOST,REDIS_PORT,REDIS_TIMEOUT);//两秒超时连接
    }
    if($res){
        $auth = 1;
        if(REDIS_AUTH){
            $auth = $redis->auth(REDIS_AUTH); //设置密码     
        }        
    }
    if($auth){
        $value_redis = $redis->get($ptid_key);
        if($value_redis == null){
          //设置3min的超时,防止del操作失败的时候,下次缓存过期一直不能load db
          if ($redis->set($ptid_mutex, 1, Array('nx', 'ex'=>60)) == 1) {  //代表设置成功
                    $value_db = get_db_gg($ptid);
                    $value_redis =  json_encode($value_db);
                    $redis->set($ptid_key, $value_redis);
                    $redis->del($ptid_mutex);
              }else {  //这个时候代表同时候的其他线程已经load db并回设到缓存了,这时候重试获取缓存值即可
                    sleep(3);
                    $value_redis = get_redis_cache($ptid);  //重试    
                    return $value_redis;
              }                        
        }        
    }
    return json_decode($value_redis,1);
}

一般避免以上情况发生我们从三个时间段去分析下:

事前:Redis 高可用,主从+哨兵,Redis cluster,避免全盘崩溃。

事中:本地 ehcache 缓存 + Hystrix 限流+降级,避免MySQL 被打死。

事后:Redis 持久化 RDB+AOF,一旦重启,自动从磁盘上加载数据,快速恢复缓存数据。

加分项

Redis Module,
像BloomFilter,
RedisSearch,
Redis-ML

#推荐链接

以上是对redis的一些总结,如有错误欢迎指出

 类似资料: