说到缓存这个概念,我想大家应该都不陌生 ,以Redis和Memcache为代表的缓存应用基本成为了现在微服务架构的标配了。
事实上,并不是说要用缓存就必须部署Redis等服务,比如在以Python为开发语言的小型单体应用中,我们可以使用functools.lru_cache来实现缓存机制,当然也可以在这个基础上二次封装来满足自己的需求,比如加上缓存过期时间等。
首先,通过一个简单的例子以了解缓存机制的概念,如下代码所示,注意这里只是说下大概的机制,不需要采用这种方法,更优雅的方法是采用functools.lru_cache。
# -*- coding: utf-8 -*-
import random
import datetime
class MyCache:
"""缓存类"""
def __init__(self):
# 用字典结构以 kv 的形式缓存数据
self.cache = {}
# 限制缓存的大小,因为缓存的空间有限
# 所以当缓存太大时,需要将旧的缓存舍弃掉
self.max_cache_size = 10
def __contains__(self, key):
"""根据该键是否存在于缓存当中返回 True 或者 False"""
return key in self.cache
def get(self, key):
"""从缓存中获取数据"""
data = self.cache[key]
data["date_accessed"] = datetime.datetime.now()
return data["value"]
def add(self, key, value):
"""更新该缓存字典,如果缓存太大则先删除最早条目"""
if key not in self.cache and len(self.cache) >= self.max_cache_size:
self.remove_oldest()
self.cache[key] = {
'date_accessed': datetime.datetime.now(),
'value': value
}
def remove_oldest(self):
"""删除具备最早访问日期的输入数据"""
oldest_entry = None
for key in self.cache:
if oldest_entry is None:
oldest_entry = key
continue
curr_entry_date = self.cache[key]['date_accessed']
oldest_entry_date = self.cache[oldest_entry]['date_accessed']
if curr_entry_date < oldest_entry_date:
oldest_entry = key
self.cache.pop(oldest_entry)
@property
def size(self):
"""返回缓存容量大小"""
return len(self.cache)
if __name__ == '__main__':
# 测试缓存功能
cache = MyCache()
cache.add("test", sum(range(100000)))
assert cache.get("test") == cache.get("test")
keys = [
'red', 'fox', 'fence', 'junk', 'other', 'alpha', 'bravo', 'cal',
'devo', 'ele'
]
s = 'abcdefghijklmnop'
for i, key in enumerate(keys):
if key in cache:
continue
else:
value = ''.join([random.choice(s) for i in range(20)])
cache.add(key, value)
assert "test" not in cache
print(cache.cache)
在 Python 的 3.2 +版本中,引入了一个非常优雅的缓存机制,即 functool
模块中的 lru_cache
装饰器,可以直接将函数或类方法的结果缓存住,后续调用则直接返回缓存的结果。lru_cache
原型如下:
@functools.lru_cache(maxsize=None, typed=False)
使用 functools 模块的 lur_cache 装饰器,可以缓存最多 maxsize 个此函数的调用结果,从而提高程序执行的效率,特别适合于耗时的函数。参数 maxsize
为最多缓存的次数,如果为 None,则无限制,设置为 2 的幂时,性能最佳;如果 typed=True
(注意,在 functools32 中没有此参数),则不同参数类型的调用将分别缓存,例如 f(3) 和 f(3.0)会分别缓存。
LRU (Least Recently Used,最近最少使用) 算法是一种缓存淘汰策略。其根据数据的历史访问记录来进行淘汰,核心思想是,“如果数据最近被访问过,那么将来被访问的几率也更高”。该算法最初为操作系统中一种内存管理的页面置换算法,主要用于找出内存中较久时间没有使用的内存块,将其移出内存从而为新数据提供空间。其原理就如以上的简单示例。
被 lru_cache
装饰的函数会有 cache_clear
和 cache_info
两个方法,分别用于清除缓存和查看缓存信息。
以下为一个简单的 lru_cache 的使用效果,如果函数被调用则会使用print打印一条日志,如果走缓存了则不会。
from functools import lru_cache
@lru_cache(None)
def add(x, y):
print("calculating: %s + %s" % (x, y))
return x + y
print(add(1, 2))
print(add(1, 2))
print(add(2, 3))
运行结果如下:
calculating: 1 + 2
3
3
calculating: 2 + 3
5
从结果可以看出,当第二次调用 add(1, 2) 时,并没有真正执行函数体,而是直接返回缓存的结果。
我们接着来看下maxsize,typed参数的设置和使用,如下代码,maxsize=1代表只缓存一个结果,type=True代表严格区分类型,a=3与a=3.0是不同的场景,会被当做2种结果缓存。
from functools import lru_cache
@lru_cache(maxsize=1, typed=True)
def add(x, y):
print("calculating: %s + %s" % (x, y))
return x + y
print(add(1, 2))
print(add(1, 2))
print(add(2, 3))
print(add(2, 3.0))
print(add(1, 2))
print(add(2, 3))
print(add(2, 3.0))
print(add(2, 3.0))
输出结果:
calculating: 1 + 2
3
3
calculating: 2 + 3
5
calculating: 2 + 3.0
5.0
calculating: 1 + 2
3
calculating: 2 + 3
5
calculating: 2 + 3.0
5.0
5.0
查看函数当前的缓存信息可以使用如下方法,比如查看add函数 :
# 查看函数缓存信息
cache_info = add.cache_info()
print(cache_info)
输出结果类似:hits代表命中缓存次数,misses代表未命中缓存次数,maxsize代表允许的最大存储结果数目,currsize表示当前存储的结果数据。
CacheInfo(hits=3, misses=2, maxsize=1, currsize=1)
如果需要考虑过期时间以及线程安全,可以使用下面的方法,基于collections.OrderedDict()实现的示例。
# -*- coding: UTF-8 -*-
"""
# rs
"""
import collections
import threading
import time
# 线程非安全类,未加锁
class LRUCacheNotThreadSafe(object):
"""
# LRU cache
"""
def __init__(self, capacity):
"""
# cache
"""
self.capacity = capacity
self.cache = collections.OrderedDict()
def get_and_clear_expired(self, key, current_timestamp):
"""
# get value and clean expired valued.
"""
try:
(value, expire_time) = self.cache.pop(key)
if expire_time > current_timestamp:
# only when don't expire, we keep this key
self.cache[key] = (value, expire_time)
return True, value
except KeyError:
return False, None
def set(self, key, value, expire_time):
"""
# set value
"""
try:
self.cache.pop(key)
except KeyError:
if len(self.cache) >= self.capacity:
self.cache.popitem(last=False)
self.cache[key] = (value, expire_time)
# 线程安全类,加锁
class LRUCacheThreadSafe(object):
"""
LRU cache,clean expired value only when get.
"""
def __init__(self, capacity):
"""cache
"""
self.capacity = capacity
self.cache = collections.OrderedDict()
self._lock = threading.Lock()
def get_and_clear_expired(self, key, current_timestamp):
"""
# get value and clean expired valued.
:param key:
:param current_timestamp:
:return:
"""
with self._lock:
try:
# 取出key对应的数据
(value, expire_time) = self.cache.pop(key)
# 如果没有过期,再次写入
if expire_time > current_timestamp:
self.cache[key] = (value, expire_time)
return True, value
except KeyError:
return False, None
def set(self, key, value, expire_time):
"""
# set value
"""
with self._lock:
try:
self.cache.pop(key)
except KeyError:
# 超出允许缓存的最大结果数目,先删除再写入
if len(self.cache) >= self.capacity:
self.cache.popitem(last=False)
self.cache[key] = (value, expire_time)
# 缓存实例
lru_ins = LRUCacheThreadSafe(capacity=50)
# 写入1
lru_ins.set("key1", "value1", int(time.time()))
# 查询1
s, v = lru_ins.get_and_clear_expired("key1", int(time.time()))
print(s, v)
# 写入2
lru_ins.set("key1", "value2", int(time.time()))
# 查询2
s, v = lru_ins.get_and_clear_expired("key1", int(time.time()))
print(s, v)
输出结果:
True value1
True value2
参考: