rq worker 就是python 进程,主要任务是从执行长时间、或是阻塞任务。
在项目根目录下,执行如下命令:rq worker high normal low
(上述命令的含义是:启动rq worker,并监听 high,normal ,low 三个任务队列)
*** Listening for work on high, normal, low
Got send_newsletter('me@nvie.com') from default
Job ended normally without result
*** Listening for work on high, normal, low
...
之后,worker 会不断的从给定的队列中读取任务,并执行任务。当所有任务都完成时,会阻塞等待新任务的到来。
注意:每个worker同一时刻只会处理一个任务。在一个worker中,不存在同步问题。如果你想同步执行多个任务,则需要启动多个worker.
worker的burst 模式
默认情况下,worker 启动后,会里面的开始工作,处理任务,然后当任务都执行完后,会进入等待。
另外,worker也可以以“burst"模式启动,在该模式下,worker 会完成当前所有的任务后,就立马退出。启动方式:rq worker --burst high normal low
rq worker命令支持如下参数:
--url or -u:指定Redis 链接信息,例如:rq worker --url redis://:secrets@example.com:1234/9)
--path or -P: 导入路径
--config or -c: RQ配置路径
--worker-class or -w: 自定义的RQ worker class 类
--job-class or -j: RQ Job class to use.
--queue-class: RQ Queue class to use.
--connection-class: Redis 链接类,默认为: redis.StrictRedis.
--log-format: 日志公式,默认为: '%(asctime)s %(message)s'
--date-format: 日期格式,默认为: '%H:%M:%S'
--disable-job-desc-logging: 关闭任务信息日志
另外,你可以通过配置文件来配置worker。例如如下配置:settings.py
REDIS_URL = 'redis://localhost:6379/1'
# You can also specify the Redis DB to use
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'
# Queues to listen on
QUEUES = ['high', 'normal', 'low']
# If you're using Sentry to collect your runtime exceptions, you can use this
# to configure RQ for it in a single step
# The 'sync+' prefix is required for raven: https://github.com/nvie/rq/issues/350#issuecomment-43592410
SENTRY_DSN = 'sync+http://public:secret@example.com/1'
# If you want custom worker name
# NAME = 'worker-1024'
worker 进程的生命周期如下:
busy
,表明自己准备要执行任务。并将要执行的任务在StartedJobRegistry
中进行注册。idle
,并设置job的执行结果。另外, 在StartedJobRegistry
中取消注册,在FinishedJobRegistry
中对任务进行注册。RQ 任务(job) 是Python对象,表示将要在worker中执行的函数(任何可执行的函数都可以,没有特别的要求。将函数即函数参数压入Redis 队列中,即可返回一个Job 对象。
import requests
from rq import Queue
from redis import Redis
#定义任务函数
def count_words_at_url(url):
resp = requests.get(url)
return len(resp.text.split())
# Tell RQ what Redis connection to use
redis_conn = Redis()
q = Queue(connection=redis_conn)
#将可执行函数压入队列中
job = q.enqueue(count_words_at_url, 'http://nvie.com')
print(job.result) # => None
# Now, wait a while, until the worker is finished
time.sleep(2)
print(job.result) # => 889
@job装饰器
当将任务压入RQ Queue 时,就会返回一个Job 示例。可以使用该job 示例来获取函数返回结果(如果job 运行结束、且任务函数有返回值的化)
使用@job装饰器,可以将一个函数修饰为一个任务
from rq.decorators import job
@job('low', connection=my_redis_conn, timeout=5)
def add(x, y):
return x + y
job = add.delay(3, 4)
time.sleep(1)
print(job.result)
@job修饰器的主要作用就是给被修饰的函数对象添加delay函数属性。在delay函数中,调用enqueue_call
函数,将被修饰函数添加到队列中。
另外,RQ 任务执行依赖处理,例如:
q = Queue('low', connection=my_redis_conn)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)
import requests
def count_words_at_url(url):
resp = requests.get(url)
return len(resp.text.split())
from redis import Redis
from rq import Queue
q = Queue(connection=Redis())
from my_module import count_words_at_url
result = q.enqueue(
count_words_at_url, 'http://nvie.com')