nodejs实现redis ORM。即操作数据库的方式操作redis。
实现思路:
需要保存一条用户数据 name='test',age=22,sex=0
1.获取自增ID,自增ID=1
2.redis key=redis_proxy_user_1,生成规则为前缀+表名+自增ID,保存为redis的hash数据类型,即:
hmset redis_proxy_user_1 name "test" age 22 sex 0
3.生成字段-值的映射关系,保存为redis的set数据类型,key生成规则前缀前缀+表名+字段+值,值为自增ID,即:
sadd redis_set_proxy_user_name_test 1;
sadd redis_set_proxy_user_age_22 1;
sadd redis_set_proxy_user_sex_0 1;
4.判断字段-值的类型,如果为数字类型,需要添加分数映射,作为区间判断时的查询,保存为redis的sort set数据类型,key生成规则前缀+表名+字段,值为自增ID,分数为字段-值,这样就可以通过分数的区间进行查询,这里age和sex是数字类型,即:
zadd redis_sort_set_user_age 22 1;
zadd redis_sort_set_user_sex 0 1;
添加数据就完成了。
查询的思路:
需要查询name='test'的用户
1.组装redis key,查询set是否存在,key=redis_set_proxy_user_name_test,即:
smembers redis_set_proxy_user_name_test
找到[1]
2.根据上面查询结果,找到的自增ID,组装redis key,查询redis hash查询数据,key的生成规则生成规则为前缀+表名+自增ID,key=redis_set_proxy_user_name_test,即:
hmget redis_set_proxy_user_name_test name age sex
查询结束。
如果是需要查询age>=22的用户
1.查询sort set,判断分数在22到正无穷的所有成员,组装redis key,生成规则前缀+表名+字段,key=redis_sort_set_user_age,即:
zrangebyscore redis_sort_set_user_age 22 +inf
找到[1]
2.根据上面查询结果,找到的自增ID,组装redis key,查询redis hash查询数据,key的生成规则生成规则为前缀+表名+自增ID,key=redis_set_proxy_user_name_test,即:
hmget redis_set_proxy_user_name_test name age sex
查询结束。
代码实现了插入数据时可设置添加查询索引字段,多条件查询,分页,排序,占位符,修改,删除。功能函数类redisDB.js:
'use strict';
const F = require('../common/function');
const _ = require('underscore');
_.str = require('underscore.string');
module.exports = function (app, commonManager) {
let managerMap = commonManager.mgr_map;
let that = this;
// redis key 前缀 - hash
this.key_prefix_hash = 'redis_db_key_hash_%s_%s'; // table increment
// redis key 前缀 - set
this.key_prefix_set = 'redis_db_key_set_%s_%s_%s'; // table field value
// redis hash自增key
this.key_prefix_has_inc = 'redis_db_key_hash_inc';
// redis 临时区间结果集自增key
this.key_prefix_tmp_set_inc = 'redis_db_key_tmp_set_inc';
// redis sort set key 前缀
this.key_prefix_sort_set = 'redis_db_key_sort_set_%s_%s'; // table field
this.getKeyHash = function (table, increment) {
return _.str.vsprintf(this.key_prefix_hash, [table, increment]);
}
this.getKeySet = function (table, field, value) {
return _.str.vsprintf(this.key_prefix_set, [table, field, value]);
}
this.getKeySortSet = function (table, field) {
return _.str.vsprintf(this.key_prefix_sort_set, [table, field]);
}
/**
* 获取最优过期时间
* @param key
* @param expire
* @returns {int}
*/
this.getBestExpire = function* (key, expire) {
let key_expire = yield managerMap.redis.ttl(key);
if(key_expire < expire) {
key_expire = expire;
}else if(expire == -1) {
key_expire = expire;
}
return key_expire;
}
/**
* 获取数组交集
* @param arrays
* @returns {array}
*/
this.intersection = function (arrays) {
if(arrays.length == 0) return new Array();
if(arrays.length == 1) return arrays[0];
// 获取长度最小数组的索引
let min_index = -1;
for (var i = 0; i < arrays.length; i++) {
if(min_index == -1 || arrays[min_index].length > arrays[i].length) {
min_index = i;
}
}
let intersection_array = new Array();
for (var i = 0; i < arrays[min_index].length; i++) {
let is_in = true; // 是否交集 默认true
for (var j = 0; j < arrays.length; j++) {
if(j == min_index) continue;
if(arrays[j].indexOf(arrays[min_index][i]) == -1) {
is_in = false;
break;
}
}
if(is_in == true && intersection_array.indexOf(arrays[min_index][i]) == -1) {
intersection_array.push(arrays[min_index][i]);
}
}
return intersection_array;
}
/**
* 判断是否为数字类型
* @param str
* @returns {bool}
*/
this.checkIsNumber = function (str) {
if(F.isNull(str)) return false;
let number_map = '1234567890';
let is_number = true;
for (var i = 0; i < str.length; i++) {
if(number_map.indexOf(str.charAt(i)) == -1) {
is_number = false;
break;
}
}
return is_number;
}
/**
* 查询结果集
* @param table
* @param option
* option包括:
* fields 字符串 例如:"name,age" 不能空
* where 字符串 例如"name = ? and age > ?" 不能空 只支持 = > < >= <=
* values 数组 例如['fdl', 2]
* @returns {}
*/
this.queryResultFromRedisDB = function* (table, option) {
let key_array = -1; // 等值操作符key集合
let res_array = -1; // 等值查询结果集
let res_between_array = -1; // 区间操作符结果集
// 解析where
let where_array = option.where.split('and');
for (var i = 0; i < where_array.length; i++) {
let split_str;
if(where_array[i].indexOf('>=') != -1) {
split_str = '>=';
}else if(where_array[i].indexOf('<=') != -1) {
split_str = '<=';
}else if(where_array[i].indexOf('>') != -1){
split_str = '>';
}else if(where_array[i].indexOf('<') != -1) {
split_str = '<';
}else if(where_array[i].indexOf('=') != -1) {
split_str = '=';
}else {
F.throwErr('where sql err.');
}
let param_array = where_array[i].split(split_str);
let field = _.str.trim(param_array[0], ' ');
let value = _.str.trim(param_array[1], [' ', '"', '\'']);
if(value == '?') {
value = option.values.shift();
}
let option_min;
let option_max;
switch(split_str) {
case '=':
if(key_array == -1) {
key_array = new Array();
}
key_array.push(this.getKeySet(table, field, value));
break;
case '>':
option_min = '(' + value;
option_max = '+inf';
break;
case '<':
option_min = '-inf';
option_max = '(' + value;
break;
case '>=':
option_min = value;
option_max = '+inf';
break;
case '<=':
option_min = '-inf';
option_max = value;
break;
}
if(split_str != '=') {
if(res_between_array == -1) {
res_between_array = new Array();
}
res_between_array.push(yield managerMap.redis.zrangebyscore(this.getKeySortSet(table, field), option_min, option_max));
}
}
let query_res = new Array();
if(key_array != -1) {
res_array = yield managerMap.redis.sinter(key_array);
query_res.push(res_array);
}
if(res_between_array != -1) {
if(query_res.length == 0) {
query_res = res_between_array;
}else {
query_res.push.apply(query_res, res_between_array);
}
}
// 获取等值查询与区间查询结果集的交集
return this.intersection(query_res);
}
/**
* 插入
* @param table
* @param option
* option包括:
* data 数据对象
* expire 过期时间 -1为永不过期
* index 字符串 创建索引的字段 例如:'name,age'
* @returns {}
*/
this.insertRedisDB = function* (table, option) {
if(F.isNull(table) || F.isNull(option) || F.isNull(option.data) || F.isNull(option.expire)) F.throwErr('params err.');
// 添加 hash映射 set映射
let key_hash_inc = yield managerMap.redis.hincrby(this.key_prefix_has_inc, table);
let key_hash = this.getKeyHash(table, key_hash_inc);
let fields_data = new Array();
let index_fields = -1;
if(!F.isNull(option.index)) {
index_fields = option.index.split(',');
for (var i = 0; i < index_fields.length; i++) {
index_fields[i] = _.str.trim(index_fields[i], ' ');
}
}
for(let field in option.data) {
fields_data.push(field);
fields_data.push(option.data[field]);
// 判断是否添加索引
if(index_fields != -1 && index_fields.indexOf(field) == -1) continue;
// 添加 set映射
let key_set = this.getKeySet(table, field, option.data[field]);
let expire_set = yield this.getBestExpire(key_set, option.expire);
yield managerMap.redis.sadd(key_set, key_hash_inc, expire_set);
// 判断是否number类型 添加number类型数据 sort set映射
if(typeof(option.data[field]) == 'number') {
let ket_sort_set = this.getKeySortSet(table, field);
let expire_sort_set = yield this.getBestExpire(ket_sort_set, option.expire);
yield managerMap.redis.zadd(ket_sort_set, key_hash_inc, option.data[field], expire_sort_set);
}
}
let expire_hash = yield this.getBestExpire(key_hash, option.expire);
yield managerMap.redis.hmset(key_hash, fields_data, expire_hash);
return {key_hash:key_hash};
};
/**
* 查询
* @param table
* @param option
* option包括:
* fields 字符串 例如:"name,age" 不能空
* where 字符串 例如"name = ? and age > ?" 不能空 只支持 = > < >= <=
* values 数组 例如['fdl', 2]
* order 字符串 为空默认desc 排序字段必须在fields出现
* limit 当获取一条数据时赋值1,为1时返回对象
* @returns {*}
*/
this.queryRedisDB = function* (table, option) {
if(F.isNull(table) || F.isNull(option) || F.isNull(option.fields) || F.isNull(option.where)) F.throwErr('params err.');
let query_res = yield this.queryResultFromRedisDB(table, option);
if(F.isNull(query_res)) {
if(F.isNull(option.limit) || option.limit != 1) {
return new Array();
}else {
return new Object();
}
}
let fields = option.fields.split(',');
for (var i = 0; i < fields.length; i++) {
fields[i] = _.str.trim(fields[i], ' ');
}
let list = new Array();
for (var i = 0; i < query_res.length; i++) {
let key_hash = this.getKeyHash(table, query_res[i]);
let has_set = yield managerMap.redis.exists(key_hash);
if(has_set == false) continue;
let data = yield managerMap.redis.hmget(key_hash, fields);
let item = {};
for (var j = 0; j < fields.length; j++) {
item[fields[j]] = data[j];
}
list.push(item);
}
if(!F.isNull(option.order)) {
let order_array = option.order.split(' ');
let field = _.str.trim(order_array[0], ' ');
let order = _.str.trim(order_array[1], ' ');
list.sort(function(objectA, objectB) {
if(order == 'asc') {
if(that.checkIsNumber(objectA[field]) == true) {
return parseInt(objectA[field]) > parseInt(objectB[field]);
}else {
return objectA[field] > objectB[field];
}
}else {
if(that.checkIsNumber(objectA[field]) == true) {
return parseInt(objectB[field]) > parseInt(objectA[field]);
}else {
return objectB[field] > objectA[field];
}
}
});
}
if(F.isNull(option.limit)) {
return list;
}else if(option.limit == 1){
return list[0];
}else {
let limit_array = option.limit.split(',');
let offset = parseInt(_.str.trim(limit_array[0], ' '));
let limit = offset + parseInt(_.str.trim(limit_array[1], ' '));
return list.slice(offset, limit);
}
};
/**
* 更新
* @param table
* @param option
* option包括:
* where 字符串 例如"name = ? and age > ?" 不能空 只支持 = > < >= <=
* values 数组 例如['fdl', 2]
* data 数据对象
* expire 过期时间 -1为永不过期
* index 字符串 创建索引的字段 例如:'name,age'
* @returns {}
*/
this.updateRedisDB = function* (table, option) {
if(F.isNull(table) || F.isNull(option) || F.isNull(option.where) || F.isNull(option.data) || F.isNull(option.expire)) {
F.throwErr('params err.');
}
let query_res = yield this.queryResultFromRedisDB(table, option);
if(F.isNull(query_res)) return true;
let del_key_array = new Array();
let del_key_record = new Object();
let update_data = new Array();
for (var i = 0; i < query_res.length; i++) {
let key_hash = this.getKeyHash(table, query_res[i]);
let fields_keys = yield managerMap.redis.hkeys(key_hash);
if(F.isNull(fields_keys)) continue;
let fields_data = yield managerMap.redis.hmget(key_hash, fields_keys);
if(F.isNull(fields_data)) continue;
// 查询旧数据字段键值映射 添加到删除数据
let insert_item = new Object();
del_key_array.push(key_hash);
for (var j = 0; j < fields_keys.length; j++) {
if(option.data.hasOwnProperty(fields_keys[j])) { // 只删除更新字段的映射key
// 查询删除key
let key_set = this.getKeySet(table, fields_keys[j], fields_data[j]);
if(F.isNull(del_key_record[key_set])) {
del_key_array.push(key_set);
del_key_record[key_set] = 1;
}
if(typeof(option.data[fields_keys[j]]) == 'number') {
// 删除区间查询映射key
let key_sort_set = this.getKeySortSet(table, fields_keys[j]);
if(F.isNull(del_key_record[key_sort_set])) {
yield managerMap.redis.zrem(this.getKeySortSet(table, fields_keys[j]), query_res[i]);
del_key_record[key_sort_set] = 1;
}
}
insert_item[fields_keys[j]] = option.data[fields_keys[j]];
}else {
insert_item[fields_keys[j]] = fields_data[j];
}
}
// 检测更新字段中是否存在新增字段
for (var field in option.data) {
if(insert_item.hasOwnProperty(field) == false) {
insert_item[field] = option.data[field];
}
}
update_data.push(insert_item);
}
yield managerMap.redis.del(del_key_array);
// 添加更新的数据
for (var i = 0; i < update_data.length; i++) {
yield this.insertRedisDB(table, {
'data': update_data[i],
'expire': option.expire
});
}
return {del_key_array:del_key_array};
}
/**
* 删除
* @param table
* @param option
* option包括:
* where 字符串 例如"name = ? and age > ?" 不能空 只支持 = > < >= <=
* @returns {}
*/
this.deleteRedisDB = function* (table, option) {
if(F.isNull(table) || F.isNull(option) || F.isNull(option.where)) F.throwErr('params err.');
let query_res = yield this.queryResultFromRedisDB(table, option);
if(F.isNull(query_res)) return true;
let del_key_array = new Array();
let del_key_record = new Object();
for (var i = 0; i < query_res.length; i++) {
let key_hash = this.getKeyHash(table, query_res[i]);
let fields_keys = yield managerMap.redis.hkeys(key_hash);
if(F.isNull(fields_keys)) continue;
let fields_data = yield managerMap.redis.hmget(key_hash, fields_keys);
if(F.isNull(fields_data)) continue;
// 查询数据字段键值映射 添加到删除数据
del_key_array.push(key_hash);
for (var j = 0; j < fields_keys.length; j++) {
// 查询删除key
let key_set = this.getKeySet(table, fields_keys[j], fields_data[j]);
if(F.isNull(del_key_record[key_set])) {
del_key_array.push(key_set);
del_key_record[key_set] = 1;
}
if(this.checkIsNumber(fields_data[j]) == true) {
// 删除区间查询映射key
let key_sort_set = this.getKeySortSet(table, fields_keys[j]);
if(F.isNull(del_key_record[key_sort_set])) {
yield managerMap.redis.zrem(this.getKeySortSet(table, fields_keys[j]), query_res[i]);
del_key_record[key_sort_set] = 1;
}
}
}
}
yield managerMap.redis.del(del_key_array);
return {del_key_array:del_key_array};
}
};
其中F.isNull函数是判断字符串,数据库,对象是否为空,managerMap.redis调用的方法是redis原生方法再封装一层,managerMap.redis.js部分函数代码:
this.hincrby = function* (key, field, increment = 1) {
return yield redisLib.redisCo.HINCRBY(key, field, increment);
}
this.ttl = function* (key) {
return yield redisLib.redisCo.ttl(key);
}
this.hmset = function* (key, data, expire = null) {
let insert_data = [key];
insert_data.push.apply(insert_data, data);
let res = yield redisLib.redisCo.hmset(insert_data);
if(!F.isNull(expire)) {
res = yield redisLib.redisCo.expire(key, expire);
}
return res;
};
this.hmget = function* (key, fields) {
let fields_array = [key];
fields_array.push.apply(fields_array, fields);
return yield redisLib.redisCo.hmget(fields_array);
};
this.hkeys = function* (key) {
return yield redisLib.redisCo.hkeys(key);
};
this.hgetall = function* (key) {
return yield redisLib.redisCo.hgetall(key);
}
this.sadd = function* (key, value, expire = null) {
let fields_array = [key];
if(Array.isArray(value)) {
fields_array.push.apply(fields_array, value);
}else {
fields_array.push(value);
}
let res = yield redisLib.redisCo.sadd(fields_array);
if(!F.isNull(expire)) {
res = yield redisLib.redisCo.expire(key, expire);
}
return res;
}
this.sinter = function* (keys) {
return yield redisLib.redisCo.sinter(keys);
}
this.exists = function* (key) {
return yield redisLib.redisCo.exists(key);
}
this.zadd = function* (key, value, score, expire = null) {
let res = yield redisLib.redisCo.ZADD([key, score, value]);
if(!F.isNull(expire)) {
res = yield redisLib.redisCo.expire(key, expire);
}
return res;
}
this.zrangebyscore = function* (key, min, max) {
return yield redisLib.redisCo.zrangebyscore([key, min, max]);
}
this.zrem = function* (key, value) {
let fields_array = [key];
if(Array.isArray(value)) {
fields_array.push.apply(fields_array, value);
}else {
fields_array.push(value);
}
return yield redisLib.redisCo.zrem(fields_array);
}
使用例子:
// 插入数据
yield mgr_map.redisDB.insertRedisDB('user', {data:{name:'fdl',age:22,sex:0,score:21}, expire:5080, index:'name, age'});
yield mgr_map.redisDB.insertRedisDB('user', {data:{name:'fdsl',age:22,sex:1,score:64}, expire:5080, index:'name, age'});
yield mgr_map.redisDB.insertRedisDB('user', {data:{name:'fd2l',age:28,sex:0,score:58}, expire:5080, index:'name, age'});
yield mgr_map.redisDB.insertRedisDB('user', {data:{name:'fdl',age:23,sex:0,score:22}, expire:5080, index:'name, age'});
yield mgr_map.redisDB.insertRedisDB('user', {data:{name:'fd8l',age:12,sex:1,score:24}, expire:5080, index:'name, age'});
// 查询
res = yield mgr_map.redisDB.queryRedisDB('user', {
where:"name=? and age > 10 and score <= ?",
values:['fdl', 22],
fields:'name , age,score',
limit:'0,100',
order:'age asc'
});
// 更新
yield mgr_map.redisDB.updateRedisDB('user', {
where:"name=?",
values:['fd2l'],
data:{
'name':'测试',
'age':100,
'contry':'china'
},
expire:5080,
index:'name, age'
});
// 删除
res = yield mgr_map.redisDB.deleteRedisDB('user', {
where:"name=? and age > 10",
values:['fdl']
});