在项目中使用celery
项目结构:
proj/__init__.py # 注意这个文件 django是不带的默认 对应django的项目
/celery.py
/tasks.py
proj/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('proj',
broker='amqp://', # 使用的消息队列
backend='amqp://', # 使用的结果存储
include=['proj.tasks']) # 任务们
if __name__ == '__main__':
app.start()
proj/tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y):
return x + y
启动工作进程:
celery -A proj worker -l info
-------------- celery@halcyon.local v4.0 (latentcall)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://guest@localhost:5672//
** ---------- . app: main:0x1012d8590
** ---------- . concurrency: 8 (processes)
** ---------- . events: OFF (enable -E to monitor this worker)
** ----------
*** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
broker就是配置中的消息队列,concurrency就是工作进程数量(默认是cpu数量)如果所有进程都被占用了那么新的任务需要等待,events是指定celery是否发送监控信息,queue就是队列。
终止工作进程
直接control -c(按照上述命令在前台启动的),当然了如果在后台启动和停止:
celery multi start w1 -A proj -l info
celery multi restart w1 -A proj -l info
# 异步关闭 立即返回
celery multi stop w1 -A proj -l info
# 等待关闭操作完成
celery multi stopwait w1 -A proj -l info
默认会在当前目录下创建pid和log文件,指定路径和名称:
celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log
关于-A(--app)选项:
指定celery app,可以用 module.path:attribute的形式指定,也可直接指定package,如--app=proj,然后会搜索其中的proj.app,proj.celery等。
操作已被创建的worker时,不需要参数什么的完全一样,只要pidfile和logfile一样即可。
调用
add.delay(1, 1)
add.apply_async((1, 1))
add.apply_async((2, 2), queue='lopri', countdown=10)
# 指定要发送到哪个队列 运行时间延迟countdown
# 结果获得
res = add.delay(1, 1)
res.get()
celery默认不产生结果的原因:因为各种应用的需求不同,而大多数任务保存返回值没有什么意义,而且产生结果并不是用来监控任务和工作进程,应该是使用event消息的专门的监控模块。
res.id # 任务id uuid
# 任务出错会raise 错误 可指定propagate
res.get(propagate=False)
res.failed()
res.successful()
res.state
# PENDING -> STARTED -> SUCCESS(FAILURE)
# STARTED 需要设置 task_track_started 或在任务级别设置@task(track_started=True)
# 重试的情况也有
# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
工作流
有时想把一个任务的签名(包含了实参和任务的执行选项)发送到另一个进程或把参数发送到另一个函数。
add.signature((2, 2), countdown=10) # with args and kwargs
add.s(2, 2) # with args only
然后获得的签名就可以调用:
s1 = add.s(2, 2)
res = s1.delay()
res.get()
# 也可以部分指定参数 然后在调用时补全参数
s2 = add.s(2)
res = s2.delay(8)
res.get()
调用方式
之前一直使用的是delay(*args, **kwargs)的方式来调用,还有apply_async的调用方式,支持一些执行时的选项
Groups
group是同时调用的一系列任务,返回特殊的结果可以获得组内所有的执行结果,可以按照任务的顺序从该结果中获得对应任务的结果。
group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# 组也支持部分参数化
g = group(add.s(i) for i in xrange(10))
g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains
把任务连接起来,执行完一个后把结果传入另一个再调用。
chain(add.s(4, 4) | mul.s(8))().get()
g = chain(add.s(4) | mul.s(8))
g(4).get()
# 也支持部分参数化
chords
chord是一个带callback的group
from celery import chord
from proj.tasks import add, xsum
chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
# 而group的结果发送到另一个任务则自动转换为chord
(group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
# 可以把单任务的结果发送到组中
upload_doc.s(file) | group(apply_filter.s() for filter in filters)
Routing
在app层面配置task到某个队列:
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
在调用时指定使用某个队列:
add.apply_async((2, 2), queue='hipri')
启动工作进程时指定该进程处理的队列:
celery -A proj worker -Q hipri
# 指定多个处理的队列 默认队列名称:celery
celery -A proj worker -Q hipri,celery
远程控制
查看工作进程当前的工作:
celery -A proj inspect active
这个命令是一个广播,所有工作进程都会收到。(需要启用event)
celery -A proj inspect active --destination=celery@example.com
让工作进程启用event
celery -A proj control enable_events