当前位置: 首页 > 工具软件 > redis-py > 使用案例 >

redis系列: redis-py 源码分析 - 一次完整的请求

郝乐心
2023-12-01

本文主要分析 redis-py 源码,阐述 redis-py 连接过程和如何获取响应

先来看一下 redis-py 的项目结构,结构比较简单,也很清晰。

.
├── __init__.py
├── _compat.py        python2.x向后兼容的内部模块
├── client.py         实现redis客户端
├── connection.py     实现连接池类,连接类和解析类
├── exceptions.py     异常模块
├── lock.py           实现共享、分布式锁
├── sentinel.py       redis高可用客户端
└── utils.py          一些其他模块

我们从一个完整的请求开始阅读源码

In [1]: import redis
In [2]: r = redis.Redis(host='localhost', port=6379, db=1)
In [3]: r.ping()
Out[3]: True

根据示例代码,将分三个阶段解读源码:实例化、发送命令、获取响应

1. 实例化

1.1 代码实现

实例化的过程就是创建一个客户端的过程,使用起来比较方便,只需要实例化 Redis 类即可

In [1]: import redis
In [2]: r = redis.Redis(host='localhost', port=6379, db=0)
1.2 初始化源码

那么 Redis 类的初始化都做了什么呢?看下面的源码:

class Redis(object):
   """
   Implementation of the Redis protocol.
   This abstract class provides a Python interface to all Redis commands
   and an implementation of the Redis protocol.
   Connection and Pipeline derive from this, implementing how
   the commands are sent and received to the Redis server
   """
   def __init__(self, host='localhost', port=6379, db=0, connection_pool=None, max_connections=None):
       if not connection_pool:
           kwargs = {
               'db': db,
               'host': host,
               'port': port,
               'max_connections': max_connextions,
               ...
           }
           connection_pool = ConnectionPool(**kwargs)
       self.connection_pool = connection_pool

上面的代码我省略了一些本次分析不需要的代码,目的是让我们的分析过程更明确。
我们在实例化 Redis 类时只传入了 redis-server 的地址参数,未指定连接池类,所以初始化的过程中实际上会先创建一个连接池。显然,我们不能忽略连接池的创建细节。

1.3 创建连接池
class ConnectionPool(ojbect):
   "Generic connection pool"
   def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs):
       max_connections = max_connections or 2 ** 31
       if not isinstance(max_connections, (int, long)) or max_connections < 0:
           raise ValueError('"max_connections" must be a positive integer')
       self.connection_class = connection_class
       self.connection_kwargs = connection_kwargs
       self.max_connections = max_connections
      
       self.reset()

  def reset(self):
      self.pid = os.getpid()
      self._created_connections = 0
      self._available_connections = []
      self._in_use_connections = set()
      self._check_lock = threading.Lock()

连接池在初始化时会接收一个类,默认是 Connection类,然后设置最大连接数,最后需要调用 reset 函数,reset 函数则设置了进程id,线程锁和一些其他属性。
到现在,还没有真正建立连接。

2. 发送命令

2.1 代码示例

实例化结束之后,我们获得了 Redis 的一个实例,但是还没有真正的建立连接,现在我直接调用客户端实例的 ping 方法,返回 True,我们先看一下 ping() 发生了什么?

In [3]: r.ping()
Out[3]: True

调用 ping() 之后,立即返回了 True。这个过程需要分开讨论,先解决发送命令。

2.2 发送命令源码
def ping(self):
    "Ping the Redis server"
    return self.execute_command('PING')

# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
    "Execute a command and return a parsed response"
    pool = self.connection_pool
    command_name = args[0]
    connection = pool.get_connection(command_name, **options)
    try:
        connection.send_command(*args)
        return self.parse_response(connection, command_name, **options)
    except (ConnectionError, TimeoutError) as e:
        pass
    finally:
        pool.release(connection)

ping() 方法直接调用了 execute_command()Redis 实现的所有的命令都是通过execute_command 发出去的。总结一下上面的代码:

  • 获取连接对象
  • 发送命令
  • 解析响应
  • 释放连接
2.2.1 获取连接对象

下面的源码虽然简化了很多,但还是比较长,简单的说一下:

  1. 获取连接对象,先检查一下是否在当前进程
  2. 然后从当前可用连接队列中弹出一个连接,如果没有,则创建一个新的连接实例
  3. 创建新的连接实例,实际上是创建一个socket连接对象。
  4. 用连接实例的 connect() 方法,目的是确保连接可用
  5. 通过 can_read() 方法检查是否有未读数据来确定该连接是否准备好发送命令
  6. 然后返回该连接实例。
# 这里省略了很多细节,感兴趣的可以看一下源码
def get_connection(self, command_name, *keys, **options):
    "Get a connection from the pool"
    self._checkpid()
    try:
        connection = self._avaliable_connections.pop()
    except IndexError:
        connection = self.make_connection()
    self._in_use_connections.add(connection)
    try:
        connection.connect()
    except:
        self.release(connection)
        raise
        
def make_connection(self):
   "Create a new connection"
   if self._created_connections >= self.max_connections:
       raise ConnectionError("Too many connections")
   self._created_connections += 1
   return self.connection_class(**self.connection_kwargs)


class Connection(object):
   "Manages TCP communication to and from a redis server"
   descritption_format = "Connection<host=%(host)s,port=%(port)s,db=%(db)s>"
   
   def __init__(self, host='localhost', port=6379, db=0, parser_class=DefaultParser, socket_read_size=65536, **kwargs):
        self.host = host
        self.port = int(port)
        self.db = db
        self._parser = parser_class(socket_read_size=socket_read_size)
        self._sock = None
        self._description_args = {
            'host': self.host,
            'port': self.port,
            'db': self.db
        }
        
    def connect(self):
       "Connects to  the Redis server if not already connected"
       if self._sock:
           return
       sock = self._connect()
       self._sock = sock
       self.on_connect()
       
   def _connect(self):
       "Create a TCP socket"
       # 简化一下socket连接过程
       for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM):
           family, socktype, proto, cannoname, socket_address = res
           sock = socket.socket(family, socktype, proto)
           sock.setsockopt(socket.IPPROTO_TCP, sockte.TCP_NODELAY, 1)
           sock.connect(socket_address)
           return sock
           
   def on_connect(self):
       "Initialize the connection, authenticate and select a database"
       self._parser.on_connect(self)  # 对服务器返回的数据进行解析
       if self.db:
           self.send_command('SELECT', self.db)
2.2.2 发送命令

上面说了一大堆,可能已经忘了我们走到哪一步了,我们在调用客户端实例的 ping() 方法过程中,先去建立一个真正的连接,现在我们已经有了真实的连接对象,也就是socket对象,那么,接下来我们调用 send_command() 方法去发送我们的 ping 命令,直接看源码:

def send_command(self, *args):
    "Pack and send a command to the redis server"
    self.send_packed_command(self.pack_command(*args))

def pack_command(self, *args):
    "Pack a series of arguments into the Redis protocol"
    return [b'*1\r\n*\r\nping\r\n']

def send_packed_command(self, command):
    "Send an already packed command to the Redis server"
    for item in command:
        self._sock.sendall(item)

命令打包函数的具体细节可以以后研究,现在我们知道 send_command() 函数会把命令参数打包,返回一个列表,列表元素是一个字节类型的字符串,字符串包含了我们要发送的命令,然后调用 send_packed_command() 函数,发送命令的函数我省略了细节,最后,通过套接字的 sendall() 方法发送出去,到此为止,我们的 ping 命令已经发给了 redis 服务器。

2.2.3 解析响应

上面,我们的ping命令已经发送给服务器了,那么如何获取响应呢?看看上面的 execute_command() 方法,return self.parse_response(connection, command_name, **options),所以,响应是在 Redis 的实例方法 parse_response() 返回。继续看源码

def parse_response(self, connection, command_name, **options):
    "Parses a response from the redis server"
    return connection.read_response()

def read_response(self):
    "Read the response from previously send command"
    return self._parser.read_response()

响应需要解析对象去获取,_parser 可能是 HiredisParser 对象,也可能是 PthoneParser 对象,取决于是否安装了 hiredishiredis 对于大批量的数据解析可以提升10倍的速度,这里我们先看一下未使用 hiredis 是如何获取响应的。

# parser对象已经在建立连接的时候动态添加了 _sock 和 _buffer 属性
def read_response(self):
    response = self._buffer.readline()   # io.BytesIO()
    byte, response = byte_to_chr(response[0]), response[1:]
    if isinstance(response, bytes):
        response = self.encoder.decode(response)
    return response

解析响应的过程实际上就是从内存读取响应的过程,当内存中没有数据时,会直接调用 socket.recv() 方法获取数据,然后写入内存。

2.2.4 释放连接

数据获取完成之后,就需要把当前的连接释放掉,具体的操作就是把这个连接对象放回到连接池中。

    def release(self, connection):
        "Releases the connection back to the pool"
        self._checkpid()
        if connection.pid != self.pid:
            return
        self._in_use_connections.remove(connection)
        self._available_connections.append(connection)

这样,一次完整的请求结束了。。。

 类似资料: