Redis从入门到放弃之Redis key过期监听

朱乐逸
2023-12-01

1.概述

Redis是一个高效的key-value数据库,同时拥有很多较为强大的功能。针对redis的过期key,可以来实现一些业务,这些业务的共性是不需要较高的实时性。由于Redis的过期事件可能会存在延迟,所以它无法实现实时性较高的功能。本文主要讲述的是跟据过期事件,实现数据统计与上报的功能。

2.监听事件的两种实现

2.1 业务背景

本文中所讲述的场景主要是:一个服务下面有多台应用设备,设备会定时上报一些数据,此服务需要跟据上报数据定时计算得出一个结果,并将结果进行上报到中心服务进行持久化操作。
设计方案:将每条数据跟据设备唯一标识(这里称为设备编号)和时间戳进行拼接作为key,将上报信息作为value,存为string类型,过期时间可设为统计周期。同时将该设备编号作为hash的key,将每条key中的时间戳作为hash的field,并将上报信息作为hash的value。这样每台设备都会拥有一个唯一的Hash来存储该设备某段时间内的数据,当某一台设备的key过期时,可以在监听事件中监听该事件,并从过期key中获取设备号,利用Redis中hash的getAll方法,将该设备号作为参数,获取该段时间内对应设备的信息,再进行遍历统计,计算结果并进行上报。
方案利弊:
优势:适用于数据量不是特别巨大,实时性要求不是特别高的场景,实现较为简单。
劣势:过期时间可能稍有延迟,实时性较低。

2.2 代码实现

2.2.1 更改Redis配置


```bash
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# Example 2: to get the stream of the expired keys subscribing to channel
# name __keyevent@0__:expired use:
notify-keyspace-events Ex

上述配置的最后一行原本是被注销掉,打开该注释即可。
或者在redis-cli下执行:

config set notify-keyspace-events Ex

2.2.2 编写RedisConfig配置文件

import com.baidu.capacity.common.constant.Constant;
import com.baidu.capacity.common.util.RedisUtils;
import com.baidu.capacity.common.push.provider.redisListener.StreamQualityListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;

/**
 * @Description: 流质量key过期处理线程
 * @Author Marin
 * @Date 2021/2/26 14:06
 */
@Configuration
public class RedisListenerConfiguration {

    @Value("${spring.redis.database}")
    private Integer db;

    @Autowired
    private RedisUtils redisUtils;

    @Bean
    public RedisMessageListenerContainer getListenerContainer(RedisConnectionFactory connectionFactory) {

        //创建连接容器
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        //放入redis连接
        container.setConnectionFactory(connectionFactory);
        
		//指定监听类型
        String patternTopic = "__keyspace@" + db + "__:" + Constant.EDGE_STREAM_QUALITY_INFO_LIST + "*";

        //写入需要被监听的类型
        Topic topic = new PatternTopic(patternTopic);
        container.addMessageListener(new StreamQualityListener(redisUtils), topic);
        return container;
    }
}

2.2.3 过期事件监听流程

package com.baidu.capacity.common.push.provider.redisListener;

import com.alibaba.fastjson.JSON;
import com.baidu.capacity.common.constant.Constant;
import com.baidu.capacity.common.entity.StreamQuality;
import com.baidu.capacity.common.util.RedisLuaDistributedLockUtils;
import com.baidu.capacity.common.util.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import java.util.*;

/**
 * @Description: 流质量key过期处理线程
 * @Author Marin
 * @Date 2021/2/26 16:03
 */
@Slf4j
public class StreamQualityListener implements MessageListener {

    private RedisUtils redisUtils;

    public StreamQualityListener(RedisUtils redisUtils) {
        this.redisUtils = redisUtils;
    }

    @Override
    public void onMessage(Message message, byte[] bytes) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String opsType = new String(body);
        String opsValue = new String(channel);
        log.info(opsType);
        log.info(new String(channel));
        String lockVal = UUID.randomUUID().toString();
        //获取过期key,统计过期key对应hash中的数据信息,跟据规则处理并将结果存入缓存
        //__keyspace@9__:STREAM-QUALITY-REPORT-INFO-test1#test1-1614304386593
        if ("expired".equals(opsType)) {
            if (StringUtils.isNotBlank(opsValue) && opsValue.split(":").length > 1) {
                //STREAM-QUALITY-REPORT-INFO-33010201021327688912#33010201021327688912-1614132409846
                String key = opsValue.split(":")[1];
                log.info("流质量信息key过期:{},过期时间为:{}",key,System.currentTimeMillis());
                String commonkey = key.split("-")[4];
                String deviceNum = commonkey.split("#")[0];
                String channelNum = commonkey.split("#")[1];
                try {
                    //加锁,防止同一台设备map未删除完而另一个key过期触发事件,导致生成多条记录结果
                    boolean lock = RedisLuaDistributedLockUtils.luaAcquireLock(commonkey, lockVal, 30);
                    if (lock) {
                        Long start = System.currentTimeMillis();
                        //lostRate总和
                        int count = 0;
                        //非空数据总条数
                        int totalNum = 0;
                        //计算结果,上报中心
                        StreamQuality streamQualityInfo = new StreamQuality();
                        streamQualityInfo.setDeviceNum(deviceNum);
                        streamQualityInfo.setChannelNum(channelNum);
                        streamQualityInfo.setCreateTime(new Date());
                        streamQualityInfo.setModifyTime(new Date());
                        Map<Object, Object> objectObjectMap = redisUtils.hGetAll(Constant.STREAM_QUALITY_HASH_KEY + commonkey);
                        for (Object object : objectObjectMap.keySet()) {
                            String hkey = (String) object;
                            String streamInfo = (String) objectObjectMap.get(hkey);
                            StreamQuality streamQuality = JSON.parseObject(streamInfo, StreamQuality.class);
                            if (StringUtils.isBlank(streamQuality.getDeviceNum()) || StringUtils.isBlank(streamQuality.getChannelNum()) || null == streamQuality.getLostRate()) {
                                log.error("设备号为:{},通道号为:{},lostRate的值为:{}不存在", deviceNum, JSON.toJSONString(streamQuality.getChannelNum()),JSON.toJSONString(streamQuality.getLostRate()));
                                //删除脏数据
                                redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
                                continue;
                            }
                            count += streamQuality.getLostRate();
                            totalNum++;
                            //同一设备和同一channelNum对应的unit_id一致
                            streamQualityInfo.setUnitId(streamQuality.getUnitId());

                            //删除已处理的Map中的记录
                            redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
                            //删除该批Map中所对应的未过期的记录
                            redisUtils.delete(Constant.EDGE_STREAM_QUALITY_INFO_LIST + commonkey + "-" + hkey);
                            log.info("删除设备编号为:{},key为:{}", Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
                        }
                        log.info("本次统计设备号为:{}的总数据为:{}", deviceNum, totalNum);
                        //TODO 此处可跟据规则计算结果并进行上报
                        //设置当前时间段内丢包率平均值
                        streamQualityInfo.setLostRate(count / (totalNum > 0 ? totalNum : 1));
                        log.info("本次上报结果为:{}", JSON.toJSONString(streamQualityInfo));
                        //将结果放入队列,开启线程轮询上报
                        redisUtils.lpush(Constant.STREAM_QUALITY_RESULT, JSON.toJSONString(streamQualityInfo));
                        log.info("处理结果时间为:{} ms", System.currentTimeMillis() - start);
                    }
                } catch (Exception e) {
                    log.error("获取redis流质量信息出错,错误信息:{}", e);
                } finally {
                    /** 释放分布式锁 */
                    RedisLuaDistributedLockUtils.luaReleaseLock(commonkey, lockVal);
                }
            }
        }
    }
}

2.2.4 监听事件的第二种实现
上述方案是监听redis指定库中的所有事件,包括插入、删除等,虽然能够实现,但是每次事件都要判断一次,难免有性能损耗。因此可以直接监听指定库的过期事件,根据key特征来进行对应事件的触发。具体实现如下:
1.配置文件改为:

package com.example.admin.redis;

import com.example.admin.listener.StreamQualityListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;

/**
 * @Description: 流质量key过期处理线程
 * @Author Marin
 * @Date 2021/2/27 10:06
 */
@Configuration
public class RedisConfiguration {

    @Value("${spring.redis.database}")
    private Integer db;

    @Autowired
    private RedisTemplate<String,String> redisUtils;

    @Bean
    public RedisMessageListenerContainer getListenerContainer(RedisConnectionFactory connectionFactory) {

        //创建连接容器
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        //放入redis连接
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

2.监听事件改为:

package com.example.admin.redis;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

/**
 * @Description: 流质量key过期处理线程
 * @Author Marin
 * @Date 2021/2/27 11:04
 */
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        log.info("expiredKey:{}", expiredKey);
        if (expiredKey.contains(Constant.EDGE_STREAM_QUALITY_INFO_LIST)) {
            String commonkey = expiredKey.split("-")[4];
            String deviceNum = commonkey.split("#")[0];
            String channelNum = commonkey.split("#")[1];
            try {
                //加锁,防止同一台设备map未删除完而另一个key过期触发事件,导致生成多条记录结果
                boolean lock = RedisLuaDistributedLockUtils.luaAcquireLock(commonkey, lockVal, 30);
                if (lock) {
                    Long start = System.currentTimeMillis();
                    //lostRate总和
                    int count = 0;
                    //非空数据总条数
                    int totalNum = 0;
                    //计算结果,上报中心
                    StreamQuality streamQualityInfo = new StreamQuality();
                    streamQualityInfo.setDeviceNum(deviceNum);
                    streamQualityInfo.setChannelNum(channelNum);
                    streamQualityInfo.setCreateTime(new Date());
                    streamQualityInfo.setModifyTime(new Date());
                    Map<Object, Object> objectObjectMap = redisUtils.hGetAll(Constant.STREAM_QUALITY_HASH_KEY + commonkey);
                    for (Object object : objectObjectMap.keySet()) {
                        String hkey = (String) object;
                        String streamInfo = (String) objectObjectMap.get(hkey);
                        StreamQuality streamQuality = JSON.parseObject(streamInfo, StreamQuality.class);
                        if (StringUtils.isBlank(streamQuality.getDeviceNum()) || StringUtils.isBlank(streamQuality.getChannelNum()) || null == streamQuality.getLostRate()) {
                            log.error("设备号为:{},通道号为:{},lostRate的值为:{}不存在", deviceNum, JSON.toJSONString(streamQuality.getChannelNum()),JSON.toJSONString(streamQuality.getLostRate()));
                            //删除脏数据
                            redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
                            continue;
                        }
                        count += streamQuality.getLostRate();
                        totalNum++;
                        //同一设备和同一channelNum对应的unit_id一致
                        streamQualityInfo.setUnitId(streamQuality.getUnitId());

                        //删除已处理的Map中的记录
                        redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
                        //删除该批Map中所对应的未过期的记录
                        redisUtils.delete(Constant.EDGE_STREAM_QUALITY_INFO_LIST + commonkey + "-" + hkey);
                        log.info("删除设备编号为:{},key为:{}", Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
                    }
                    log.info("本次统计设备号为:{}的总数据为:{}", deviceNum, totalNum);
                    //TODO 此处可跟据规则计算结果并进行上报
                    //设置当前时间段内丢包率平均值
                    streamQualityInfo.setLostRate(count / (totalNum > 0 ? totalNum : 1));
                    log.info("本次上报结果为:{}", JSON.toJSONString(streamQualityInfo));
                    //将结果放入队列,开启线程轮询上报
                    redisUtils.lpush(Constant.STREAM_QUALITY_RESULT, JSON.toJSONString(streamQualityInfo));
                    log.info("处理结果时间为:{} ms", System.currentTimeMillis() - start);
                }
            } catch (Exception e) {
                log.error("获取redis流质量信息出错,错误信息:{}", e);
            } finally {
                /** 释放分布式锁 */
                RedisLuaDistributedLockUtils.luaReleaseLock(commonkey, lockVal);
            }
        }
    }
}

3.小结

1.鉴于Redis的默认过期删除策略,上述方法适用于同批次上报数据量不大的场景(数据量小于500左右,上报周期1s);
2.经测试,当同批数据量较大,超过1000或更多(上报周期为1s),会造成某段时间内同一台设备会生成两条结果,原因是当前设备的第一条数据过期时,处理流程未能及时删除第二条过期key,而数据仍不断写入,则第二条数据又触发了一次过期事件,导致生成两条结果。目前正在寻找优化方案,稍后更新。
3.上述两种实现方案的本质其实是一致,监听事件内的处理流程都是一样的,区别在于监听的事件范围,第二种方案是只监听所有的过期事件,根据key的特征来进行事件的执行,比方案一的效率稍高些。

4.参考文献

1.https://juejin.cn/post/6844904158227595271
2.https://www.cnblogs.com/pinxiong/p/13288087.html
3.https://blog.csdn.net/shuizimuzhonglingf/article/details/102782014

 类似资料: