本文主要分析 redis-py
源码,阐述 redis-py
连接过程和如何获取响应
先来看一下 redis-py 的项目结构,结构比较简单,也很清晰。
.
├── __init__.py
├── _compat.py python2.x向后兼容的内部模块
├── client.py 实现redis客户端
├── connection.py 实现连接池类,连接类和解析类
├── exceptions.py 异常模块
├── lock.py 实现共享、分布式锁
├── sentinel.py redis高可用客户端
└── utils.py 一些其他模块
我们从一个完整的请求开始阅读源码
In [1]: import redis
In [2]: r = redis.Redis(host='localhost', port=6379, db=1)
In [3]: r.ping()
Out[3]: True
根据示例代码,将分三个阶段解读源码:实例化、发送命令、获取响应
实例化的过程就是创建一个客户端的过程,使用起来比较方便,只需要实例化 Redis
类即可
In [1]: import redis
In [2]: r = redis.Redis(host='localhost', port=6379, db=0)
那么 Redis
类的初始化都做了什么呢?看下面的源码:
class Redis(object):
"""
Implementation of the Redis protocol.
This abstract class provides a Python interface to all Redis commands
and an implementation of the Redis protocol.
Connection and Pipeline derive from this, implementing how
the commands are sent and received to the Redis server
"""
def __init__(self, host='localhost', port=6379, db=0, connection_pool=None, max_connections=None):
if not connection_pool:
kwargs = {
'db': db,
'host': host,
'port': port,
'max_connections': max_connextions,
...
}
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool
上面的代码我省略了一些本次分析不需要的代码,目的是让我们的分析过程更明确。
我们在实例化 Redis
类时只传入了 redis-server
的地址参数,未指定连接池类,所以初始化的过程中实际上会先创建一个连接池。显然,我们不能忽略连接池的创建细节。
class ConnectionPool(ojbect):
"Generic connection pool"
def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs):
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
self.reset()
def reset(self):
self.pid = os.getpid()
self._created_connections = 0
self._available_connections = []
self._in_use_connections = set()
self._check_lock = threading.Lock()
连接池在初始化时会接收一个类,默认是 Connection
类,然后设置最大连接数,最后需要调用 reset
函数,reset
函数则设置了进程id,线程锁和一些其他属性。
到现在,还没有真正建立连接。
实例化结束之后,我们获得了 Redis
的一个实例,但是还没有真正的建立连接,现在我直接调用客户端实例的 ping
方法,返回 True
,我们先看一下 ping()
发生了什么?
In [3]: r.ping()
Out[3]: True
调用 ping()
之后,立即返回了 True
。这个过程需要分开讨论,先解决发送命令。
def ping(self):
"Ping the Redis server"
return self.execute_command('PING')
# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
connection = pool.get_connection(command_name, **options)
try:
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
pass
finally:
pool.release(connection)
ping()
方法直接调用了 execute_command()
,Redis
实现的所有的命令都是通过execute_command
发出去的。总结一下上面的代码:
下面的源码虽然简化了很多,但还是比较长,简单的说一下:
connect()
方法,目的是确保连接可用can_read()
方法检查是否有未读数据来确定该连接是否准备好发送命令# 这里省略了很多细节,感兴趣的可以看一下源码
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
try:
connection = self._avaliable_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
try:
connection.connect()
except:
self.release(connection)
raise
def make_connection(self):
"Create a new connection"
if self._created_connections >= self.max_connections:
raise ConnectionError("Too many connections")
self._created_connections += 1
return self.connection_class(**self.connection_kwargs)
class Connection(object):
"Manages TCP communication to and from a redis server"
descritption_format = "Connection<host=%(host)s,port=%(port)s,db=%(db)s>"
def __init__(self, host='localhost', port=6379, db=0, parser_class=DefaultParser, socket_read_size=65536, **kwargs):
self.host = host
self.port = int(port)
self.db = db
self._parser = parser_class(socket_read_size=socket_read_size)
self._sock = None
self._description_args = {
'host': self.host,
'port': self.port,
'db': self.db
}
def connect(self):
"Connects to the Redis server if not already connected"
if self._sock:
return
sock = self._connect()
self._sock = sock
self.on_connect()
def _connect(self):
"Create a TCP socket"
# 简化一下socket连接过程
for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM):
family, socktype, proto, cannoname, socket_address = res
sock = socket.socket(family, socktype, proto)
sock.setsockopt(socket.IPPROTO_TCP, sockte.TCP_NODELAY, 1)
sock.connect(socket_address)
return sock
def on_connect(self):
"Initialize the connection, authenticate and select a database"
self._parser.on_connect(self) # 对服务器返回的数据进行解析
if self.db:
self.send_command('SELECT', self.db)
上面说了一大堆,可能已经忘了我们走到哪一步了,我们在调用客户端实例的 ping()
方法过程中,先去建立一个真正的连接,现在我们已经有了真实的连接对象,也就是socket对象,那么,接下来我们调用 send_command()
方法去发送我们的 ping
命令,直接看源码:
def send_command(self, *args):
"Pack and send a command to the redis server"
self.send_packed_command(self.pack_command(*args))
def pack_command(self, *args):
"Pack a series of arguments into the Redis protocol"
return [b'*1\r\n*\r\nping\r\n']
def send_packed_command(self, command):
"Send an already packed command to the Redis server"
for item in command:
self._sock.sendall(item)
命令打包函数的具体细节可以以后研究,现在我们知道 send_command()
函数会把命令参数打包,返回一个列表,列表元素是一个字节类型的字符串,字符串包含了我们要发送的命令,然后调用 send_packed_command()
函数,发送命令的函数我省略了细节,最后,通过套接字的 sendall()
方法发送出去,到此为止,我们的 ping
命令已经发给了 redis
服务器。
上面,我们的ping
命令已经发送给服务器了,那么如何获取响应呢?看看上面的 execute_command()
方法,return self.parse_response(connection, command_name, **options)
,所以,响应是在 Redis
的实例方法 parse_response()
返回。继续看源码
def parse_response(self, connection, command_name, **options):
"Parses a response from the redis server"
return connection.read_response()
def read_response(self):
"Read the response from previously send command"
return self._parser.read_response()
响应需要解析对象去获取,_parser
可能是 HiredisParser
对象,也可能是 PthoneParser
对象,取决于是否安装了 hiredis
。hiredis
对于大批量的数据解析可以提升10倍的速度,这里我们先看一下未使用 hiredis
是如何获取响应的。
# parser对象已经在建立连接的时候动态添加了 _sock 和 _buffer 属性
def read_response(self):
response = self._buffer.readline() # io.BytesIO()
byte, response = byte_to_chr(response[0]), response[1:]
if isinstance(response, bytes):
response = self.encoder.decode(response)
return response
解析响应的过程实际上就是从内存读取响应的过程,当内存中没有数据时,会直接调用 socket.recv()
方法获取数据,然后写入内存。
数据获取完成之后,就需要把当前的连接释放掉,具体的操作就是把这个连接对象放回到连接池中。
def release(self, connection):
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection)
self._available_connections.append(connection)
这样,一次完整的请求结束了。。。