官方文档:https://pypi.org/project/flask-redis/
python-redis文档:https://redis-py.readthedocs.io/en/latest/
本人自己封装的own-redis类
from flask_redis import FlaskRedis
redis_client = FlaskRedis()
class OwnRedis:
@classmethod
def get(cls, key):
return redis_client[key]
@classmethod
def set(cls, key, value):
redis_client[key] = value
# 注册redis分布式数据库插件
def __app_register_redis(self):
from app.libs.redis.own_redis import redis_client
redis_client.init_app(self.app)
这个是flask-redis封装的类【内部实现了dict访问方式】
try:
import redis
except ImportError:
# We can still allow custom provider-only usage without redis-py being installed
redis = None
class FlaskRedis(object):
def __init__(self, app=None, strict=True, config_prefix="REDIS", **kwargs):
self._redis_client = None
self.provider_class = redis.StrictRedis if strict else redis.Redis
self.provider_kwargs = kwargs
self.config_prefix = config_prefix
if app is not None:
self.init_app(app)
@classmethod
def from_custom_provider(cls, provider, app=None, **kwargs):
assert provider is not None, "your custom provider is None, come on"
# We never pass the app parameter here, so we can call init_app
# ourselves later, after the provider class has been set
instance = cls(**kwargs)
instance.provider_class = provider
if app is not None:
instance.init_app(app)
return instance
def init_app(self, app, **kwargs):
redis_url = app.config.get(
"{0}_URL".format(self.config_prefix), "redis://localhost:6379/0"
)
self.provider_kwargs.update(kwargs)
self._redis_client = self.provider_class.from_url(
redis_url, **self.provider_kwargs
)
if not hasattr(app, "extensions"):
app.extensions = {}
app.extensions[self.config_prefix.lower()] = self
def __getattr__(self, name):
return getattr(self._redis_client, name)
def __getitem__(self, name):
return self._redis_client[name]
def __setitem__(self, name, value):
self._redis_client[name] = value
def __delitem__(self, name):
del self._redis_client[name]