Redis是一个高效的key-value数据库,同时拥有很多较为强大的功能。针对redis的过期key,可以来实现一些业务,这些业务的共性是不需要较高的实时性。由于Redis的过期事件可能会存在延迟,所以它无法实现实时性较高的功能。本文主要讲述的是跟据过期事件,实现数据统计与上报的功能。
本文中所讲述的场景主要是:一个服务下面有多台应用设备,设备会定时上报一些数据,此服务需要跟据上报数据定时计算得出一个结果,并将结果进行上报到中心服务进行持久化操作。
设计方案:将每条数据跟据设备唯一标识(这里称为设备编号)和时间戳进行拼接作为key,将上报信息作为value,存为string类型,过期时间可设为统计周期。同时将该设备编号作为hash的key,将每条key中的时间戳作为hash的field,并将上报信息作为hash的value。这样每台设备都会拥有一个唯一的Hash来存储该设备某段时间内的数据,当某一台设备的key过期时,可以在监听事件中监听该事件,并从过期key中获取设备号,利用Redis中hash的getAll方法,将该设备号作为参数,获取该段时间内对应设备的信息,再进行遍历统计,计算结果并进行上报。
方案利弊:
优势:适用于数据量不是特别巨大,实时性要求不是特别高的场景,实现较为简单。
劣势:过期时间可能稍有延迟,实时性较低。
2.2.1 更改Redis配置
```bash
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# Example 2: to get the stream of the expired keys subscribing to channel
# name __keyevent@0__:expired use:
notify-keyspace-events Ex
上述配置的最后一行原本是被注销掉,打开该注释即可。
或者在redis-cli下执行:
config set notify-keyspace-events Ex
2.2.2 编写RedisConfig配置文件
import com.baidu.capacity.common.constant.Constant;
import com.baidu.capacity.common.util.RedisUtils;
import com.baidu.capacity.common.push.provider.redisListener.StreamQualityListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
/**
* @Description: 流质量key过期处理线程
* @Author Marin
* @Date 2021/2/26 14:06
*/
@Configuration
public class RedisListenerConfiguration {
@Value("${spring.redis.database}")
private Integer db;
@Autowired
private RedisUtils redisUtils;
@Bean
public RedisMessageListenerContainer getListenerContainer(RedisConnectionFactory connectionFactory) {
//创建连接容器
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
//放入redis连接
container.setConnectionFactory(connectionFactory);
//指定监听类型
String patternTopic = "__keyspace@" + db + "__:" + Constant.EDGE_STREAM_QUALITY_INFO_LIST + "*";
//写入需要被监听的类型
Topic topic = new PatternTopic(patternTopic);
container.addMessageListener(new StreamQualityListener(redisUtils), topic);
return container;
}
}
2.2.3 过期事件监听流程
package com.baidu.capacity.common.push.provider.redisListener;
import com.alibaba.fastjson.JSON;
import com.baidu.capacity.common.constant.Constant;
import com.baidu.capacity.common.entity.StreamQuality;
import com.baidu.capacity.common.util.RedisLuaDistributedLockUtils;
import com.baidu.capacity.common.util.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.util.*;
/**
* @Description: 流质量key过期处理线程
* @Author Marin
* @Date 2021/2/26 16:03
*/
@Slf4j
public class StreamQualityListener implements MessageListener {
private RedisUtils redisUtils;
public StreamQualityListener(RedisUtils redisUtils) {
this.redisUtils = redisUtils;
}
@Override
public void onMessage(Message message, byte[] bytes) {
byte[] body = message.getBody();
byte[] channel = message.getChannel();
String opsType = new String(body);
String opsValue = new String(channel);
log.info(opsType);
log.info(new String(channel));
String lockVal = UUID.randomUUID().toString();
//获取过期key,统计过期key对应hash中的数据信息,跟据规则处理并将结果存入缓存
//__keyspace@9__:STREAM-QUALITY-REPORT-INFO-test1#test1-1614304386593
if ("expired".equals(opsType)) {
if (StringUtils.isNotBlank(opsValue) && opsValue.split(":").length > 1) {
//STREAM-QUALITY-REPORT-INFO-33010201021327688912#33010201021327688912-1614132409846
String key = opsValue.split(":")[1];
log.info("流质量信息key过期:{},过期时间为:{}",key,System.currentTimeMillis());
String commonkey = key.split("-")[4];
String deviceNum = commonkey.split("#")[0];
String channelNum = commonkey.split("#")[1];
try {
//加锁,防止同一台设备map未删除完而另一个key过期触发事件,导致生成多条记录结果
boolean lock = RedisLuaDistributedLockUtils.luaAcquireLock(commonkey, lockVal, 30);
if (lock) {
Long start = System.currentTimeMillis();
//lostRate总和
int count = 0;
//非空数据总条数
int totalNum = 0;
//计算结果,上报中心
StreamQuality streamQualityInfo = new StreamQuality();
streamQualityInfo.setDeviceNum(deviceNum);
streamQualityInfo.setChannelNum(channelNum);
streamQualityInfo.setCreateTime(new Date());
streamQualityInfo.setModifyTime(new Date());
Map<Object, Object> objectObjectMap = redisUtils.hGetAll(Constant.STREAM_QUALITY_HASH_KEY + commonkey);
for (Object object : objectObjectMap.keySet()) {
String hkey = (String) object;
String streamInfo = (String) objectObjectMap.get(hkey);
StreamQuality streamQuality = JSON.parseObject(streamInfo, StreamQuality.class);
if (StringUtils.isBlank(streamQuality.getDeviceNum()) || StringUtils.isBlank(streamQuality.getChannelNum()) || null == streamQuality.getLostRate()) {
log.error("设备号为:{},通道号为:{},lostRate的值为:{}不存在", deviceNum, JSON.toJSONString(streamQuality.getChannelNum()),JSON.toJSONString(streamQuality.getLostRate()));
//删除脏数据
redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
continue;
}
count += streamQuality.getLostRate();
totalNum++;
//同一设备和同一channelNum对应的unit_id一致
streamQualityInfo.setUnitId(streamQuality.getUnitId());
//删除已处理的Map中的记录
redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
//删除该批Map中所对应的未过期的记录
redisUtils.delete(Constant.EDGE_STREAM_QUALITY_INFO_LIST + commonkey + "-" + hkey);
log.info("删除设备编号为:{},key为:{}", Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
}
log.info("本次统计设备号为:{}的总数据为:{}", deviceNum, totalNum);
//TODO 此处可跟据规则计算结果并进行上报
//设置当前时间段内丢包率平均值
streamQualityInfo.setLostRate(count / (totalNum > 0 ? totalNum : 1));
log.info("本次上报结果为:{}", JSON.toJSONString(streamQualityInfo));
//将结果放入队列,开启线程轮询上报
redisUtils.lpush(Constant.STREAM_QUALITY_RESULT, JSON.toJSONString(streamQualityInfo));
log.info("处理结果时间为:{} ms", System.currentTimeMillis() - start);
}
} catch (Exception e) {
log.error("获取redis流质量信息出错,错误信息:{}", e);
} finally {
/** 释放分布式锁 */
RedisLuaDistributedLockUtils.luaReleaseLock(commonkey, lockVal);
}
}
}
}
}
2.2.4 监听事件的第二种实现
上述方案是监听redis指定库中的所有事件,包括插入、删除等,虽然能够实现,但是每次事件都要判断一次,难免有性能损耗。因此可以直接监听指定库的过期事件,根据key特征来进行对应事件的触发。具体实现如下:
1.配置文件改为:
package com.example.admin.redis;
import com.example.admin.listener.StreamQualityListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
/**
* @Description: 流质量key过期处理线程
* @Author Marin
* @Date 2021/2/27 10:06
*/
@Configuration
public class RedisConfiguration {
@Value("${spring.redis.database}")
private Integer db;
@Autowired
private RedisTemplate<String,String> redisUtils;
@Bean
public RedisMessageListenerContainer getListenerContainer(RedisConnectionFactory connectionFactory) {
//创建连接容器
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
//放入redis连接
container.setConnectionFactory(connectionFactory);
return container;
}
}
2.监听事件改为:
package com.example.admin.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
* @Description: 流质量key过期处理线程
* @Author Marin
* @Date 2021/2/27 11:04
*/
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
log.info("expiredKey:{}", expiredKey);
if (expiredKey.contains(Constant.EDGE_STREAM_QUALITY_INFO_LIST)) {
String commonkey = expiredKey.split("-")[4];
String deviceNum = commonkey.split("#")[0];
String channelNum = commonkey.split("#")[1];
try {
//加锁,防止同一台设备map未删除完而另一个key过期触发事件,导致生成多条记录结果
boolean lock = RedisLuaDistributedLockUtils.luaAcquireLock(commonkey, lockVal, 30);
if (lock) {
Long start = System.currentTimeMillis();
//lostRate总和
int count = 0;
//非空数据总条数
int totalNum = 0;
//计算结果,上报中心
StreamQuality streamQualityInfo = new StreamQuality();
streamQualityInfo.setDeviceNum(deviceNum);
streamQualityInfo.setChannelNum(channelNum);
streamQualityInfo.setCreateTime(new Date());
streamQualityInfo.setModifyTime(new Date());
Map<Object, Object> objectObjectMap = redisUtils.hGetAll(Constant.STREAM_QUALITY_HASH_KEY + commonkey);
for (Object object : objectObjectMap.keySet()) {
String hkey = (String) object;
String streamInfo = (String) objectObjectMap.get(hkey);
StreamQuality streamQuality = JSON.parseObject(streamInfo, StreamQuality.class);
if (StringUtils.isBlank(streamQuality.getDeviceNum()) || StringUtils.isBlank(streamQuality.getChannelNum()) || null == streamQuality.getLostRate()) {
log.error("设备号为:{},通道号为:{},lostRate的值为:{}不存在", deviceNum, JSON.toJSONString(streamQuality.getChannelNum()),JSON.toJSONString(streamQuality.getLostRate()));
//删除脏数据
redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
continue;
}
count += streamQuality.getLostRate();
totalNum++;
//同一设备和同一channelNum对应的unit_id一致
streamQualityInfo.setUnitId(streamQuality.getUnitId());
//删除已处理的Map中的记录
redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
//删除该批Map中所对应的未过期的记录
redisUtils.delete(Constant.EDGE_STREAM_QUALITY_INFO_LIST + commonkey + "-" + hkey);
log.info("删除设备编号为:{},key为:{}", Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
}
log.info("本次统计设备号为:{}的总数据为:{}", deviceNum, totalNum);
//TODO 此处可跟据规则计算结果并进行上报
//设置当前时间段内丢包率平均值
streamQualityInfo.setLostRate(count / (totalNum > 0 ? totalNum : 1));
log.info("本次上报结果为:{}", JSON.toJSONString(streamQualityInfo));
//将结果放入队列,开启线程轮询上报
redisUtils.lpush(Constant.STREAM_QUALITY_RESULT, JSON.toJSONString(streamQualityInfo));
log.info("处理结果时间为:{} ms", System.currentTimeMillis() - start);
}
} catch (Exception e) {
log.error("获取redis流质量信息出错,错误信息:{}", e);
} finally {
/** 释放分布式锁 */
RedisLuaDistributedLockUtils.luaReleaseLock(commonkey, lockVal);
}
}
}
}
1.鉴于Redis的默认过期删除策略,上述方法适用于同批次上报数据量不大的场景(数据量小于500左右,上报周期1s);
2.经测试,当同批数据量较大,超过1000或更多(上报周期为1s),会造成某段时间内同一台设备会生成两条结果,原因是当前设备的第一条数据过期时,处理流程未能及时删除第二条过期key,而数据仍不断写入,则第二条数据又触发了一次过期事件,导致生成两条结果。目前正在寻找优化方案,稍后更新。
3.上述两种实现方案的本质其实是一致,监听事件内的处理流程都是一样的,区别在于监听的事件范围,第二种方案是只监听所有的过期事件,根据key的特征来进行事件的执行,比方案一的效率稍高些。
1.https://juejin.cn/post/6844904158227595271
2.https://www.cnblogs.com/pinxiong/p/13288087.html
3.https://blog.csdn.net/shuizimuzhonglingf/article/details/102782014