django redis-cache

施华奥
2023-12-01

Django Redis Cahe

项目中有些接口的数据需要做缓存,如果基于django开发的接口,那么我们可以使用django-reidis插件,只需要做一些配置,即可帮我们实现目的。省去实现代码。

需要在你的项目settings.py文件加入以下配置项:

# 接口缓存配置
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://{host}:{port}/10".format(host=redis_host, port=redis_port),
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 10},
            "PASSWORD": redis_password,
        }
    }
}

BACKEND 必须要指定,LOCATION参数配置redis资源地址信息,
OPTIONS可选项参数,可以指定redis入口类,以及连接池和是否使用密码的参数。一般的时候这样的配置即可运行起来,结合开发的接口,可以测试出效果。

毕竟是使用默认的参数配置,在某些环境可能需要定制修改,比如使用自定义的连接池,或者使用redis集群连接等,这是需要一些定时代码。
如果是自定义连接池,可以继承ConnecitonPool类,重写自己需要的方法,然后在OPTIONS下配置CONNECTION_POOL_CLASS。
还可以使用自己定义的reidis client类,默认使用redis.client.StrictClient,可以基于Redis类定制代码。使用REDIS_CLIENT_CLASS配置客户端入口类,REDIS_CLIENT_KWARGS配置入口类方法的参数。

配置示例:

# 接口缓存配置
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://{host}:{port}/10".format(host=REDIS_CONFIG.get_host(), port=REDIS_CONFIG.get_port()),
        "OPTIONS": {
            # "CLIENT_CLASS": "django_redis.client.DefaultClient",
            # "CONNECTION_POOL_KWARGS": {"max_connections": 10},
            # "PASSWORD": REDIS_CONFIG.get_password(),
            "REDIS_CLIENT_CLASS":  "utilitys.redis_cache_connection_pool.RedisCacheClient",
        }
    }
}

自定义客户端实际可以不用LOCATION参数,但django-redis必须要配置他,否则会报错。而且还不能为空。

连接池及客户端简单代码,目的是不用默认的。

import redis
from redis.client import Redis
from redis.sentinel import Sentinel, SentinelConnectionPool
from config.application_config.db_config.redis_config import REDIS_CONFIG
from loglib.logger import logger


class RedisCacheConnectionPool(object):

    def __init__(self, *args, **kwargs):
        self._cluster_flag = REDIS_CONFIG.get_cluster_flag()

    def get_connection_pool(self):
        """

        :return:
        """
        if self._cluster_flag:
            # 使用哨兵模式建立连接池

            sen = Sentinel(sentinels=REDIS_CONFIG.get_sentinel_node_list(),
                           sentinel_kwargs={"max_connections": REDIS_CONFIG.get_max_connections()},
                           max_connections=REDIS_CONFIG.get_max_connections()
                           )

            _args = {
                "max_connections": REDIS_CONFIG.get_max_connections(),
                "db": REDIS_CONFIG.get_db()
            }

            if REDIS_CONFIG.get_password() != "":
                _args["password"] = REDIS_CONFIG.get_password()

            return SentinelConnectionPool(service_name="master1", sentinel_manager=sen, **_args)

        else:
            # 如果只有单主机节点,则调用默认的连接池方式连接
            _args = {
                "host": REDIS_CONFIG.get_host(),
                "port": REDIS_CONFIG.get_port(),
                "db": REDIS_CONFIG.get_db(),
            }
            if REDIS_CONFIG.get_password() != "":
                _args["password"] = REDIS_CONFIG.get_password()

            return redis.ConnectionPool(max_connections=REDIS_CONFIG.get_max_connections(),
                                        **_args
                                        )


class RedisCacheClient(Redis):

    def __init__(self, *args, **kwargs):
        super(RedisCacheClient, self).__init__(connection_pool=RedisCacheConnectionPool(args, kwargs).get_connection_pool())

    def close(self, *args, **kwargs):
        """

        :param args:
        :param kwargs:
        :return:
        """
        logger.debug("django redis cache execute close method, args: %s, %s", args, kwargs)

在实验的时候主要是用单个节和哨兵模式的集群做测试,因为默认配置不好配哨兵模式的参数,所以只有写一些代码。

课外参考

 类似资料: