当前位置: 首页 > 工具软件 > Redis Queue > 使用案例 >

Redis Queue(rq)

元玮
2023-12-01

参考文献:http://python-rq.org/


Redis Queue 是一个轻量级的python 库,用于将任务放入到队列将,并在后台异步执行。Redis Queue依赖Redis ,且Redis 的版本要求>=3.0.0

1.基本概念与基础对象

1.1 worker

rq worker 就是python 进程,主要任务是从执行长时间、或是阻塞任务。

1.1.1 启动worker

在项目根目录下,执行如下命令:rq worker high normal low
(上述命令的含义是:启动rq worker,并监听 high,normal ,low 三个任务队列)

*** Listening for work on high, normal, low
Got send_newsletter('me@nvie.com') from default
Job ended normally without result
*** Listening for work on high, normal, low
...

之后,worker 会不断的从给定的队列中读取任务,并执行任务。当所有任务都完成时,会阻塞等待新任务的到来。

注意:每个worker同一时刻只会处理一个任务。在一个worker中,不存在同步问题。如果你想同步执行多个任务,则需要启动多个worker.

worker的burst 模式
默认情况下,worker 启动后,会里面的开始工作,处理任务,然后当任务都执行完后,会进入等待。

另外,worker也可以以“burst"模式启动,在该模式下,worker 会完成当前所有的任务后,就立马退出。启动方式:rq worker --burst high normal low

1.1.2 Worker 启动参数

rq worker命令支持如下参数:

--url or -u:指定Redis 链接信息,例如:rq worker --url redis://:secrets@example.com:1234/9)

--path or -P: 导入路径

--config or -c: RQ配置路径

--worker-class or -w:  自定义的RQ worker class 类

--job-class or -j: RQ Job class to use.

--queue-class: RQ Queue class to use.

--connection-class: Redis 链接类,默认为: redis.StrictRedis.

--log-format: 日志公式,默认为: '%(asctime)s %(message)s'

--date-format: 日期格式,默认为: '%H:%M:%S'

--disable-job-desc-logging: 关闭任务信息日志

另外,你可以通过配置文件来配置worker。例如如下配置:settings.py

REDIS_URL = 'redis://localhost:6379/1'

# You can also specify the Redis DB to use
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'

# Queues to listen on
QUEUES = ['high', 'normal', 'low']

# If you're using Sentry to collect your runtime exceptions, you can use this
# to configure RQ for it in a single step
# The 'sync+' prefix is required for raven: https://github.com/nvie/rq/issues/350#issuecomment-43592410
SENTRY_DSN = 'sync+http://public:secret@example.com/1'

# If you want custom worker name
# NAME = 'worker-1024'

1.1.3 worker 的生命周期

worker 进程的生命周期如下:

  • 启动,主要是加载python 环境;
  • 注册。worker 将自己注册到RQ 系统中;
  • 开始监听。从Redis 队列中取任务。如果worker 工作在burst 模式下,且所有监听的队列是空的,则worker 进程退出,进程结束。否则,等待任务到来;
  • 准备执行任务。worker 将自己的状态设置为busy,表明自己准备要执行任务。并将要执行的任务在StartedJobRegistry中进行注册。
  • 启动子进程。利用子进程执行任务。
  • 任务执行后的清理工作。worker设置自己的状态为idle,并设置job的执行结果。另外, 在StartedJobRegistry中取消注册,在FinishedJobRegistry中对任务进行注册。
  • 循环,继续监听任务队列。

1.2 任务

RQ 任务(job) 是Python对象,表示将要在worker中执行的函数(任何可执行的函数都可以,没有特别的要求。将函数即函数参数压入Redis 队列中,即可返回一个Job 对象。

import requests
from rq import Queue
from redis import Redis

#定义任务函数
def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

# Tell RQ what Redis connection to use
redis_conn = Redis()
q = Queue(connection=redis_conn)  

#将可执行函数压入队列中
job = q.enqueue(count_words_at_url, 'http://nvie.com')
print(job.result)   # => None

# Now, wait a while, until the worker is finished
time.sleep(2)
print(job.result)   # => 889

@job装饰器
当将任务压入RQ Queue 时,就会返回一个Job 示例。可以使用该job 示例来获取函数返回结果(如果job 运行结束、且任务函数有返回值的化)

使用@job装饰器,可以将一个函数修饰为一个任务

from rq.decorators import job
@job('low', connection=my_redis_conn, timeout=5)
def add(x, y):
    return x + y

job = add.delay(3, 4)
time.sleep(1)
print(job.result)

@job修饰器的主要作用就是给被修饰的函数对象添加delay函数属性。在delay函数中,调用enqueue_call函数,将被修饰函数添加到队列中。

另外,RQ 任务执行依赖处理,例如:

q = Queue('low', connection=my_redis_conn)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)

2. 使用步骤

  • 启动Redis数据库,也可以使用已有的Redis实例
  • 编写你的任务函数
import requests
def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())
  • 创建RQ 队列,并绑定使用的Redis
from redis import Redis
from rq import Queue

q = Queue(connection=Redis())
  • 将任务放入RQ队列中
from my_module import count_words_at_url
result = q.enqueue(
             count_words_at_url, 'http://nvie.com')
 类似资料:

相关阅读

相关文章

相关问答