当前位置: 首页 > 工具软件 > redis-py > 使用案例 >

redis系列: python客户端 redis-py 功能简介

戚俊健
2023-12-01

引言
redis-py 是为了连接redis服务器而实现的 python-redis 客户端,因此本文需要有 redis-server 的支持。

  1. 安装
    建议在python虚拟环境下安装以避免python包冲突
    pip install redis

  2. 开始使用

> import redis
> r = redis.Redis(host='localhost', port=6379, db=0)
> r.set('key', 'value')
True
> r.get('key')
b'value'

说明: 在python3.x中,所有的字符串响应返回bytes类型的数据;在python2.x中,所有的字符串响应返回str类型的数据。可以在实例化Redis时设置关键字参数 decode_responses=True ,这样所有的字符串响应会使用指定的编码方式 (默认encoding='utf-8') 进行解码后返回字符串数据。

> import redis
> r = redis.Redis(host='localhost', port=6379, db=0, decode_response=True, encoding='utf-8')
> r.set('key', 'value')
True
> r.get('key')
'value'
  1. 连接池
    redis-py 使用连接池管理对 redis-server 的连接,默认情况下,每个 Redis 类的实例都会轮流创建自己的连接池,可以通过自己创建连接池实例并配置到 Redis 类实例化方法中来复用连接池。
>  import redis
>  pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
>  r = redis.Redis(connection_pool=pool)
  1. 连接
    连接池管理多个连接实例。redis-py 有两种类型的连接,默认是基于 TCP 套接字的连接。当客户端和服务器运行在一个设备上时可以用过 UnixDomainSocketConnection 建立连接,即 unix domain socket。通过配置参数 unix_socket_path='/tmp/redis.sock' 使用 UnixDomainSocketConnection,值是描述 unix domain 套接字文件的字符串,另外需要确保 unixsocket 已经在 redis.conf 中定义。
> r = redis.Redis(unix_socket_path='/tmp/redis.sock')

如果想控制socket在异步框架中的行为可以创建 Connection 的子类。通过参数 connection_class 绑定自定义的连接子类,需要注意的是,传递个连接池初始化的其他关键字参数会自动传给指定的子类。

> pool = redis.ConnectionPool(connection_class=YourConnectionClass, your_args=...)
  1. 解析器
    解析器可以控制如何解析 redis-server 的响应数据,redis-py 提供了两种解析类 PythonParserHiRedisParser,如果当前环境已安装 hiredis 模块,则使用 HiRedisParser,否则会使用 PythonParser
    HiRedisParser 是由 redis 核心团队维护的 c 库,可以提高10倍的响应速度,尤其是获取大批量数据时,性能的提升更明显。例如 LRANGESMEMBERS 等操作。
> pip install hiredis
  1. 响应回调
    客户端类使用一组回调将 redis-server 的响应强制转换成适当的 Python 类型。客户端类在属性为 RESPONSE_CALLBACKS 的字典中定义了这些回调。
    通过使用 set_response_callback 方法基于每个实例添加回调,方法接受两个参数: 命令名称、回调函数。这种方式添加的回调只对当前实例有效,如果想定义全局回调,需要自己实现 Redis 客户端子类,然后添加自定义回调到 RESPONSE_CALLBACKS 的字典中。
    回调函数至少要有一个接受 redis-server 响应的参数,也可以接受关键字参数以便进一步控制响应,关键字参数在命令调用 execute_command 期间被指定,可以参考 ZRANGE 命令介绍关键字参数 withscores 的实现。

  2. 线程安全
    redis 是线程安全的。在内部,连接实例仅在命令执行期间从连接池中检索,然后直接返回池。命令的执行不会修改连接实例的状态。
    警告: 使用 SELECT 命令需要注意的是,更改数据库后会返回该连接实例到一个连接不同数据库的连接池中。因此,redis-py 客户端没有实现 SELECT 命令,如果在你的应用中需要使用多个数据库,你应该为每个数据库创建一个连接实例(或者连接池)。
    在线程间传递 PubSub or Pipeline对象是不安全的。

  3. 管道
    管道 PipelinesRedis 基类的子类,它为在单个请求中想服务器缓冲多条命令提供支持。它通过减少 Redis 服务器和客户端 TCP 包的数量能显著提高组命令的性能。
    使用:

> import redis
> r = redis.Redis(...)  # 使用redis实例
> r.set('key', 'value')  # 使用redis实例
> 
> # 使用redis实例去创建一个pipeline实例
> pipe = r.pipeline()
> 
> # 设置buffer命令
> pipe.set('key', 'value')
> pipe.get('key')
>
> # 执行
> pipe.execute()
[True, 'value']

缓冲到 pipeline 中的所有命令都会返回到 pipeline 对象本身,因此可以链式调用:

> pipe.set('name', 'redis-py').sadd('rank', 1).incr('age').execute()
[True, True, 6]

管道还能确保一组命令原子执行。可以通过关闭事务来禁止原子执行:

> pipe = r.pipeline(transaction=False)

一个常见的问题是,在一个要求原子事务中,我们需要先检索值然后在事务中使用这个值。假设没有 INCR 命令,我们需要再 python 中构建 INCR 的原子版本。
完成的实现的话,先获取值,然后增值,最后设置并返回。但这不是原子操作,因为多个客户端可以同事这样做。
WATCH 命令,WATCH 命令提供了在一个事务开始之前监视一个或多个键的功能,如果任何一个键在事务执行之前发生了改变,则取消整个事务并引发 WATCHERROR

>  with r.pipeline as pipe:
···    while True:
···        try:
···            pipe.watch('OUR-SEQUENCE-KEY')
···            # watch之后,pipeline会被放入立刻执行模块中,知道我们主动控制其开始缓冲命令
···            cur = pipe.get('OUR-SEQUENCE-KEY')
···            next_value = int(cur) + 1
···            # 使用multi让pipeline返回到缓冲命令模式
···            pipe.multi()
···            pipe.set('OUR-SEQUENCE-KEY', next_value)
···            pipe.execute()
···            break
···        except WatchError:
···            continue

WATCH 期间,管道必须绑定一个单独的连接,必须注意要调用 reset 方法确保连接返回到连接池中。使用上下文管理器时会自动调用 reset 方法。

>  pipe = r.pipeline()
>  while True:
···     try:
···         pipe.watch('OUR-SEQUENCE-KEY')
···         ...
···         pipe.execute()
···         break
···     except WatchError:
···         continue
···     finally:
···         pipe.reset()

还存在一个 transaction 的方法,用户处理监视错误的样板。该方法可以传入一个回调函数,函数需要接收管道,参数和一组key。

In [41]: def client_side_incr(pipe):
    ...:     current_value = pipe.get('OUR-SEQUENCE-KEY')
    ...:     next_value = int(current_value or 0) + 1
    ...:     pipe.multi()
    ...:     pipe.set('OUR-SEQUENCE-KEY', next_value)
    ...:

In [42]: r.transaction(client_side_incr, 'OUR-SEQUENCE-KEY')
Out[42]: [True]
  1. 发布和订阅
    redis-py 中的 SubPub 对象用来订阅和监听消息。可以通过下面的语句创建一个 SubPub 对象
In [44]: r = redis.Redis(port=32771)
In [45]: p = r.pubsub()

创建 SubPub 对象后,可以订阅,支持模式匹配

In [46]: p.subscribe('my-first-channel', 'my-second-channel')
In [47]: p.psubscribe('my-*')

通过读消息可以查看是否有新的消息

In [48]: p.get_message()
Out[48]:
{'type': 'subscribe',
 'pattern': None,
 'channel': b'my-first-channel',
 'data': 1}

In [49]: p.get_message()
Out[49]:
{'type': 'subscribe',
 'pattern': None,
 'channel': b'my-second-channel',
 'data': 2}

In [50]: p.get_message()
Out[50]: {'type': 'psubscribe', 'pattern': None, 'channel': b'my-*', 'data': 3}

可以再打开一个 redis 客户端,或者使用当前redis实例也可以,发布消息

In [51]: r.publish('my-first-channel', 'some data')
Out[51]: 2
In [52]: p.get_message()

Out[52]:
{'type': 'message',
 'pattern': None,
 'channel': b'my-first-channel',
 'data': b'some data'}

In [53]: p.get_message()
Out[53]:
{'type': 'pmessage',
 'pattern': b'my-*',
 'channel': b'my-first-channel',
 'data': b'some data'}

取消订阅,SubPub 对象提供了 unsubscribepunsubscribe 方法,如果不传入任何参数,则取消订阅全部频道。

In [55]: p.unsubscribe()
In [57]: p.punsubscribe('my-*')

In [58]: p.get_message()
Out[58]:
{'type': 'unsubscribe',
 'pattern': None,
 'channel': b'my-second-channel',
 'data': 2}

In [59]: p.get_message()
Out[59]:
{'type': 'unsubscribe',
 'pattern': None,
 'channel': b'my-first-channel',
 'data': 1}

In [60]: p.get_message()
Out[60]: {'type': 'punsubscribe', 'pattern': None, 'channel': b'my-*', 'data': 0}

订阅回调
通过注册回调函数处理已发布的消息,如果在回调函数中对消息进行读取,则通过 get_message 只能获取到 None 值,因为消息已经被处理。

In [62]: def my_handler(message):
    ...:     print('MY Handler: ', message['data'])
    ...:
In [64]: p.subscribe(**{'my-channel': my_handler})
In [66]: p.get_message()
Out[66]: {'type': 'subscribe', 'pattern': None, 'channel': b'my-channel', 'data': 1}

In [69]: r.pubsh('my-channel', 'awesome data')
Out[70]: 1

In [72]: message = p.get_message()
MY Handler:  b'awesome data'

In [73]: print(message)
None

如果对确认消息 subscribe/unsubscribe 机制不感兴趣,可以设置免打扰,这样只会接受生产者发布的消息,对订阅或者取消订阅的消息自动设置已读。

In [75]: p = r.pubsub(ignore_subscribe_messages=True)

In [76]: p.subscribe('my-channel')

In [77]: p.get_message()

In [78]: r.publish('my-channel', 'my-data')
Out[78]: 1

In [79]: p.get_message()
Out[79]:
{'type': 'message',
 'pattern': None,
 'channel': b'my-channel',
 'data': b'my-data'}

接受消息
有三种机制来接收消息,可以使用快速轮询 socket

In [80]: while True:
    ...:     message = p.get_message()
    ...:     if message:
    ...:         # do something
    ...:         pass
    ...:     time.sleep(0.001)  # be nice to the system.
    ...:

或者,使用 listen 机制,记住,它是堵塞的

In [81]: for message in p.listen():
    ...:     # do something
    ...:     pass
    ...:

第三中,利用线程循环接受消息,可以使用方法 pubsub.run_in_thread(), 这个方法返回线程对象给调用者,调用者使用 thread.stop() 方法停止线程内的事件循环,实际上只是对 get_message() 做了简单的封装,创建一个非阻塞的事件循环。run_in_thread() 可以传入参数指定每次循环睡眠时间。使用该方法必须创建事件回调函数,因为在线程中无法处理消息。

In [82]: p.subscribe(**{'my-channel': my_handler})
In [83]: thread = p.run_in_thread(sleep_time=0.001)
In [84]: thread.stop()  # 停止 event loop

当发生连接异常时,再次连接后会重新订阅所有频道和模式,消息也会重新发出。完成 SubPub 对象的使用时,应该调用 close 方法关闭连接。

In [85]: p.close()
 类似资料: