目录
1.6 Expire, ExpireAt 设置缓存有效期;TTL, PTTL 获取缓存有效期
1.8 Del 删除缓存项(同步,会造成 redis 主线程阻塞)
1.9 Unlink 删除缓存项(异步,不会造成 redis 主线程阻塞)
2.0 FlushDB 清空当前连接的 redis 正在使用的数据库 (同步,会造成主线程阻塞)
2.1 FlushDBAsync 清空当前连接的 redis 正在使用的数据库(异步,子线程执行)
2.2 FlushAll 清空当前连接的 redis 的所有数据库(同步,会造成主线程阻塞)
2.3 FlushAllAsync 清空当前连接的 redis 的所有数据库(异步,子线程执行)
前景提要:下载 redis 和 redis 可视化管理工具(windows 机器测试使用)
go get github.com/go-redis/redis/v8
go get github.com/go-redis/redis/v9
func NewRedisCmd(cfg *conf.Data) (redis.Cmdable, error) {
rdb := redis.NewClient(&redis.Options{
Network: cfg.RedisCfg.Net, // "tcp",
Addr: cfg.RedisCfg.Addr, // "127.0.0.1:6379",
Username: cfg.RedisCfg.User, // "amin"
Password: cfg.RedisCfg.Pwd, // "admin",
ReadTimeout: cfg.RedisCfg.ReadTimeout, // 0.1*time.Second,
WriteTimeout: cfg.RedisCfg.WriteTimeout, // 0.1*time.Second,
})
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := rdb.Ping(timeoutCtx).Err(); err != nil {
return nil, fmt.Errorf("redis 连接失败: %v", err)
}
return rdb, nil
}
// go-redis 对输入的 go 数据结构的处理
func (w *Writer) WriteArg(v interface{}) error {
switch v := v.(type) {
case nil:
return w.string("")
case string:
return w.string(v)
case []byte:
return w.bytes(v)
case int:
return w.int(int64(v))
case int8:
return w.int(int64(v))
case int16:
return w.int(int64(v))
case int32:
return w.int(int64(v))
case int64:
return w.int(v)
case uint:
return w.uint(uint64(v))
case uint8:
return w.uint(uint64(v))
case uint16:
return w.uint(uint64(v))
case uint32:
return w.uint(uint64(v))
case uint64:
return w.uint(v)
case float32:
return w.float(float64(v))
case float64:
return w.float(v)
case bool:
if v {
return w.int(1)
}
return w.int(0)
case time.Time:
w.numBuf = v.AppendFormat(w.numBuf[:0], time.RFC3339Nano)
return w.bytes(w.numBuf)
case time.Duration:
return w.int(v.Nanoseconds())
case encoding.BinaryMarshaler:
b, err := v.MarshalBinary()
if err != nil {
return err
}
return w.bytes(b)
default:
return fmt.Errorf(
"redis: can't marshal %T (implement encoding.BinaryMarshaler)", v)
}
}
// go-redis 转换字节值为 go 数据结构的方法
// Scan parses bytes `b` to `v` with appropriate type.
// nolint:gocyclo
func Scan(b []byte, v interface{}) error {
switch v := v.(type) {
case nil:
return fmt.Errorf("redis: Scan(nil)")
case *string:
*v = util.BytesToString(b)
return nil
case *[]byte:
*v = b
return nil
case *int:
var err error
*v, err = util.Atoi(b)
return err
case *int8:
n, err := util.ParseInt(b, 10, 8)
if err != nil {
return err
}
*v = int8(n)
return nil
case *int16:
n, err := util.ParseInt(b, 10, 16)
if err != nil {
return err
}
*v = int16(n)
return nil
case *int32:
n, err := util.ParseInt(b, 10, 32)
if err != nil {
return err
}
*v = int32(n)
return nil
case *int64:
n, err := util.ParseInt(b, 10, 64)
if err != nil {
return err
}
*v = n
return nil
case *uint:
n, err := util.ParseUint(b, 10, 64)
if err != nil {
return err
}
*v = uint(n)
return nil
case *uint8:
n, err := util.ParseUint(b, 10, 8)
if err != nil {
return err
}
*v = uint8(n)
return nil
case *uint16:
n, err := util.ParseUint(b, 10, 16)
if err != nil {
return err
}
*v = uint16(n)
return nil
case *uint32:
n, err := util.ParseUint(b, 10, 32)
if err != nil {
return err
}
*v = uint32(n)
return nil
case *uint64:
n, err := util.ParseUint(b, 10, 64)
if err != nil {
return err
}
*v = n
return nil
case *float32:
n, err := util.ParseFloat(b, 32)
if err != nil {
return err
}
*v = float32(n)
return err
case *float64:
var err error
*v, err = util.ParseFloat(b, 64)
return err
case *bool:
*v = len(b) == 1 && b[0] == '1'
return nil
case *time.Time:
var err error
*v, err = time.Parse(time.RFC3339Nano, util.BytesToString(b))
return err
case *time.Duration:
n, err := util.ParseInt(b, 10, 64)
if err != nil {
return err
}
*v = time.Duration(n)
return nil
case encoding.BinaryUnmarshaler:
return v.UnmarshalBinary(b)
default:
return fmt.Errorf(
"redis: can't unmarshal %T (consider implementing BinaryUnmarshaler)", v)
}
}
// 因此,我们想要存储 go 结构体对象的话,需要实现 encoding 包的以下两个接口
package encoding
// BinaryMarshaler is the interface implemented by an object that can
// marshal itself into a binary form.
//
// MarshalBinary encodes the receiver into a binary form and returns the result.
type BinaryMarshaler interface {
MarshalBinary() (data []byte, err error)
}
// BinaryUnmarshaler is the interface implemented by an object that can
// unmarshal a binary representation of itself.
//
// UnmarshalBinary must be able to decode the form generated by MarshalBinary.
// UnmarshalBinary must copy the data if it wishes to retain the data
// after returning.
type BinaryUnmarshaler interface {
UnmarshalBinary(data []byte) error
}
// 然后,我们就可以优雅的将 go 结构体对象存储到 redis 了
const (
goRedisTest11 = "goRedis:test11"
)
// 结构体名大小写都无所谓,只要符合自己的业务需求使用就行
// 但是结构体成员变量一定要大写,否则将无法序列化到字节数组中
type testObj struct {
Name string
Cnt int32
}
// 这里是为了在编译前校验 testObj 这个结构体有没有实现 ecoding 包的
// BinaryMarshaler, BinaryUnmarshaler 两个接口,如果没有实现,编译前就会报错
var _ encoding.BinaryMarshaler = new(testObj)
var _ encoding.BinaryUnmarshaler = new(testObj)
// 使用 json.Marshal 实现 BinaryMarshaler 接口
func (t *testObj) MarshalBinary() (data []byte, err error) {
return json.Marshal(t)
}
// 使用 json.Unmarshal 实现 BinaryUnmarshaler 接口
func (t *testObj) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, t)
}
func main() {
ctx := context.Background()
obj11 := &testObj{
Name: "test11",
Cnt: 111,
}
setResult, err := rdb.Set(ctx, goRedisTest11, obj11, 10*time.Second).Result()
if err != nil {
fmt.Printf("err: %v", err)
}
fmt.Printf("setResult: %v\n", setResult)
_obj11 := &testObj{}
err = rdb.Get(ctx, goRedisTest11).Scan(_obj11)
if err != nil {
return
}
fmt.Printf("_obj11: %v", _obj11)
}
// setResult: OK
// _obj11: &{test11 111}
// Process finished with the exit code 0
// 源码
func (c cmdable) Keys(ctx context.Context, pattern string) *StringSliceCmd {
cmd := NewStringSliceCmd(ctx, "keys", pattern)
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 根据正则获取 keys
result, err := rdb.Keys(ctx, "*").Result()
if err != nil {
return
}
fmt.Println(result)
// [goRedis:test9 goRedis:test3 goRedis:test4 goRedis:test1 goRedis:test7 goRedis:test2 goRedis:test6 goRedis:test10 goRedis:test8 goRedis:test5]
}
// 源码
func (c cmdable) Type(ctx context.Context, key string) *StatusCmd {
cmd := NewStatusCmd(ctx, "type", key)
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 获取 key 对应的值类型
result, err := rdb.Type(ctx, "goRedis:test1").Result()
if err != nil {
return
}
fmt.Println(result)
// string
}
// 源码
func (c cmdable) Exists(ctx context.Context, keys ...string) *IntCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "exists"
for i, key := range keys {
args[1+i] = key
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 检测某个缓存项是否存在, 虽然可以同时检测多个,但是每次检测一个会比较好用,具体看业务需求决定怎么用
result, err := rdb.Exists(ctx, "goRedis:test6").Result()
if err != nil {
return
}
if result == 0 {
fmt.Println("goRedis:test6 不存在")
} else {
fmt.Println("goRedis:test6 存在")
}
// goRedis:test6 存在
}
设置好了缓存项后,可以再设置有效期
Expire()方法是设置缓存项在某个时间段(time.Duration)后过期,ExpireAt()方法是设置缓存项在某个时间点(time.Time)过期失效。
// 源码(Expire, TTL 函数)
func (c cmdable) Expire(ctx context.Context, key string, expiration time.Duration) *BoolCmd {
return c.expire(ctx, key, expiration, "")
}
func (c cmdable) expire(
ctx context.Context, key string, expiration time.Duration, mode string,
) *BoolCmd {
args := make([]interface{}, 3, 4)
args[0] = "expire"
args[1] = key
args[2] = formatSec(ctx, expiration)
if mode != "" {
args = append(args, mode)
}
cmd := NewBoolCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) TTL(ctx context.Context, key string) *DurationCmd {
cmd := NewDurationCmd(ctx, time.Second, "ttl", key)
_ = c(ctx, cmd)
return cmd
}
// 源码(ExpireAt, PTTL 函数)
func (c cmdable) ExpireAt(ctx context.Context, key string, tm time.Time) *BoolCmd {
cmd := NewBoolCmd(ctx, "expireat", key, tm.Unix())
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) PTTL(ctx context.Context, key string) *DurationCmd {
cmd := NewDurationCmd(ctx, time.Millisecond, "pttl", key)
_ = c(ctx, cmd)
return cmd
}
// 实践(Expire, TTL 函数)
func main() {
ctx := context.Background()
const goRedisTest5 = "goRedis:test5"
const goRedisTest4 = "goRedis:test4"
if res, err := rdb.TTL(ctx, goRedisTest5).Result(); err != nil {
log.Errorf("获取缓存项 %v 有效时长失败: %v", goRedisTest5, err)
return
} else {
log.Infof("缓存项 %v 还有 %v 过期", goRedisTest5, res)
}
// 设置缓存项在之后一段时间后失效
success, err := rdb.Expire(ctx, goRedisTest5, 3600*time.Second).Result()
if err != nil {
log.Errorf("设置缓存项 %v 有效时长失败: %v", goRedisTest5, err)
return
}
if success {
log.Infof("缓存项 %v 已变更有效时长", goRedisTest5)
res, err := rdb.TTL(ctx, goRedisTest5).Result()
if err != nil {
log.Errorf("获取缓存项 %v 有效时长失败: %v", goRedisTest5, err)
return
}
log.Infof("缓存项 %v 还有 %v 过期", goRedisTest5, res)
} else {
log.Errorf("设置缓存项 %v 有效时长失败: %v", goRedisTest5, err)
return
}
// INFO msg=缓存项 goRedis:test5 还有 58m24s 过期
// INFO msg=缓存项 goRedis:test5 已变更有效时长
// INFO msg=缓存项 goRedis:test5 还有 1h0m0s 过期
}
// 实践(ExpireAt, PTTL 函数)
func main() {
ctx := context.Background()
const goRedisTest5 = "goRedis:test5"
const goRedisTest4 = "goRedis:test4"
if res, err := rdb.PTTL(ctx, goRedisTest4).Result(); err != nil {
log.Errorf("获取缓存项 %v 有效时长失败: %v", goRedisTest4, err)
return
} else {
log.Infof("缓存项 %v 还有 %v 过期", goRedisTest4, res)
}
// 设置缓存项在之后一段时间后失效
success, err := rdb.ExpireAt(ctx, goRedisTest4, time.Date(2023, 04, 15, 00, 00, 00, 00, time.Local)).Result()
if err != nil {
log.Errorf("设置缓存项 %v 过期时间失败: %v", goRedisTest4, err)
return
}
if success {
log.Infof("缓存项 %v 已变更过期时间", goRedisTest4)
res, err := rdb.PTTL(ctx, goRedisTest4).Result()
if err != nil {
log.Errorf("获取缓存项 %v 过期时间失败: %v", goRedisTest4, err)
return
}
log.Infof("缓存项 %v 还有 %v 过期", goRedisTest4, res)
} else {
log.Errorf("设置缓存项 %v 过期时间失败: %v", goRedisTest4, err)
return
}
// INFO msg=缓存项 goRedis:test4 还有 -1ns 过期
// INFO msg=缓存项 goRedis:test4 已变更过期时间
// INFO msg=缓存项 goRedis:test4 还有 15h49m54.18s 过期
// 源码
func (c cmdable) DBSize(ctx context.Context) *IntCmd {
cmd := NewIntCmd(ctx, "dbsize")
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 查看当前数据库缓存数
result, err := rdb.DBSize(ctx).Result()
if err != nil {
log.Errorf("查询当前数据库缓存数失败: %v", err)
return
}
log.Infof("当前数据库共有 %v 个缓存项", result)
// INFO msg=当前数据库共有 6 个缓存项
}
// 源码
func (c cmdable) Del(ctx context.Context, keys ...string) *IntCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "del"
for i, key := range keys {
args[1+i] = key
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 删除缓存项
result, err := rdb.Del(ctx, "goRedis:test1", "goRedis:test2").Result()
if err != nil {
return
}
fmt.Printf("删除了 %v 个缓存项", result)
// 删除了 2 个缓存项
}
// 源码
func (c cmdable) Unlink(ctx context.Context, keys ...string) *IntCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "unlink"
for i, key := range keys {
args[1+i] = key
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 异步删除指定的缓存项
toDelKeys := []string{"key1", "key2"}
result, err := rdb.Unlink(ctx, toDelKeys...).Result()
if err != nil {
log.Errorf("后台删除缓存项失败: %v", err)
return
}
log.Warnf("缓存项%v已删除: %v", toDelKeys, result)
// WARN msg=缓存项[key1 key2]已删除: 0
}
flushdb 命令会清空当前连接的 redis 服务器正在使用的数据库,但不会进行持久化,并且是同步的方式,可能会由于缓存数目过大而造成 redis 主线程阻塞。
// 源码
func (c cmdable) FlushDB(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "flushdb")
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 同步清除当前连接的 redis 服务器正在使用的库,不会将数据持久化
_, err := rdb.FlushDB(ctx).Result()
if err != nil {
log.Errorf("清空当前数据库失败: %v", err)
return
}
log.Infof("已清空当前数据库")
// 查询当前数据库的缓存数
result, err := rdb.DBSize(ctx).Result()
if err != nil {
log.Errorf("查询当前数据库缓存数失败: %v", err)
return
}
log.Infof("当前数据有 %v 个缓存项", result)
// INFO msg=已清空当前数据库
// INFO msg=当前数据有 0 个缓存项
}
flushdb async 命令会清空当前连接的 redis 服务器正在使用的数据库,并且采用异步的方式,不会造成主线程阻塞。
--> 扩展知识:redis 主线程启动后,会使用操作系统提供的 pthread_create 系统调用函数生成三个子线程,分别由他们执行:AOF 日志写操作,键值对删除操作,文件打开关闭的异步执行,主线程则通过一个链表形式的任务队列和子线程进行交互。
--> 继续扩展:当使用异步的形式清除缓存项时,redis 主线程会把这个操作封装成一个任务,放入到任务队列中,然后给客户端返回一个完成信息,表明收到的缓存项删除任务(删除某些键值对、清空某个数据库或清空所有数据库)已经吩咐下去了。实际上此时缓存项可能还没删除,要等到专门处理键值对删除操作的子线程从任务队列中读取到该任务时,才实际开始执行清理操作,并释放对应的内存空间,这种方式也叫惰性删除,不会阻塞主线程,避免了对主线程的性能造成影响。
// 源码
func (c cmdable) FlushDBAsync(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "flushdb", "async")
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 异步清除当前连接的 redis 服务器正在使用的库,不会将数据持久化
result, err := rdb.FlushDBAsync(ctx).Result()
if err != nil {
log.Errorf("异步清除当前连接的 redis 服务器正在使用的库失败: %v", err)
return
}
log.Warnf("已清除当前连接的 redis[%v] 服务器正在使用的库: %v", rdb, result)
// WARN msg=已清除当前连接的 redis[Redis<localhost:6379 db:0>] 服务器正在使用的库: OK
}
// 源码
func (c cmdable) FlushAll(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "flushall")
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 同步清除当前连接的 redis 服务器的所有库,会将数据持久化
result, err := rdb.FlushAll(ctx).Result()
if err != nil {
log.Errorf("同步清空当前连接的 redis 服务器的所有数据库失败")
return
}
log.Warnf("已同步清空当前连接的 redis[%v] 服务器的所有数据库: %v", rdb, result)
// WARN msg=已同步清空当前连接的 redis[Redis<localhost:6379 db:0>] 服务器的所有数据库: OK
}
// 源码
func (c cmdable) FlushAllAsync(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "flushall", "async")
_ = c(ctx, cmd)
return cmd
}
// 实践
func main() {
ctx := context.Background()
// 异步清除当前连接的 redis 服务器的所有库,会将数据持久化
result, err := rdb.FlushAllAsync(ctx).Result()
if err != nil {
log.Errorf("异步清空当前连接的 redis 服务器的所有数据库失败")
return
}
log.Warnf("已异步清空当前连接的 redis[%v] 服务器的所有数据库: %v", rdb, result)
// WARN msg=已异步清空当前连接的 redis[Redis<localhost:6379 db:0>] 服务器的所有数据库: OK
}