原文链接:https://www.celerycn.io/ru-men/celery-jin-jie-shi-yong
Celery 初次使用章节简单的说明了一下 Celery 的基本使用,本章节将更详细的介绍和使用 Celery,其中包含在自己的应用程序中和库中使用 Celery。
本章节结尾记录有 Celery 的所有功能和最佳实战,建议继续阅读“用户指南”。
项目的结构:
proj/__init__.py
/celery.py
/tasks.py
proj/celery.py
proj/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('proj',
broker = 'amqp://',
backend = 'amqp://',
include = ['proj.tasks'])
# Optional configuration, see the application user guide
app.conf.update(
result_expires = 3600,
)
if __name__ == '__main__':
app.start()
在此程序中,创建了Celery 实例(也称 app),如果需要使用 Celery,导入即可。
proj/tasks.py
from __future__ import absolute_import, unicode_literals
from proj.celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
Celery程序可以用于启动职称(Worker):
$ celery -A proj worker -l info
当职程(Worker)开始运行时,可以看到一部分日志消息:
-------------- celery@halcyon.local v4.0 (latentcall)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://guest@localhost:5672//
- ** ---------- . app: __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events: OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
celery worker --help
职程(Worker)文档:Workers Guide 章节中详细描述了配置项的使用。
链接:https://www.celerycn.io/yong-hu-zhi-nan/zhi-cheng-worker-wen-dang-workers-guide
使用Control+C就可以停止职程(Worker)。上面链接也描述了职程(Worker)支持的信号列表。
在生产环境中,如果需要后台运行职程(Worker),可以参阅 https://www.celerycn.io/yong-hu-zhi-nan/shou-hu-jin-cheng-daemonization。
可以使用celery multi命令在后台启动一个或多个职程(Worker):
$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Starting nodes...
> w1.halcyon.local: OK
也可以进行重启:
$ celery multi restart w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
> w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64052
停止运行:
$ celery multi stop w1 -A proj -l info
stop 命令是异步的,所以不会等待职程(Worker)关闭。可以通过 stopwait 命令进行停止运行,可以保证在退出之前完成当前正在执行的任务:
$ celery multi stopwait w1 -A proj -l info
注意:celery multi不存储有关职程的信息,所以在重启时需要使用相同的命令参数,停止运行时只能通过pidfile和logfile参数来进行停止运行。
默认情况下会在当前目录中创建pid文件和日志文件,为防止多个职程(Worker)干扰,建议将这些文件存放在专门的目录中:
$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log
也可以使用 multi 命令启动多个职程(Worker),有一个强大的语法为不同职程(Worker)设置不同的参数:
$ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \
-Q default -L:4,5 debug
使用–app参数可也指定运行的Celery应用程序实例,格式必须为
module.path:attribute
但如果只设置包名,它将进行搜索app实例,顺序如下:
用 --app=proj:
你可以使用delay()方法进行调用:
>>> add.delay(2, 2)
delay() 实际上为 apply_async() 的快捷使用:
>>> add.apply_async((2, 2))
apply_async() 可以指定调用时执行的参数,例如运行的时间,使用的任务队列等:
>>> add.apply_async((2, 2), queue='lopri', countdown=10)
上面的实例中,任务被下发到 lopri 队列中,任务下发之后会在最早10秒内执行。 直接调用任务函数进行执行任务,不会发送任何任务消息:
>>> add(2, 2)
4
delay() apply_async() 以及 apply(call) 为 Celery 调用的API,也可以用于签名。
调用任务:Calling Tasks 章节中详细的描述了 Calling API 使用。
每一个任务被调用时会复制一个任务的ID(UUID)。
delay()和apply_async()方法会返回一个AsyncResult实例,可以用于进行跟踪任务状况。如果进行跟踪任务状态,需要设置一个结果后端,以便于存储。
默认情况下禁用结果,因为没有一个适合所有应用程序的结果后端,对于大量的任务来说,保存返回内容不是非常有用的,所以该默认值是一个比较合理的。另外,结果后端不是用于监控任务以及职程(Worker),Celery 有专用的事物消息来进行监控(详情请参阅:监控和管理手册:Monitoring and Management Guide)。
如果配置了结果后端,可以获取任务的返回值:
>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4
也可以通过 id 属性进行获取任务的ID:
>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114
如果任务执行引发异常,可以进行检查异常以及溯源,默认情况下 result.get() 会抛出异常:
>>> res = add.delay(2)
>>> res.get(timeout=1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/celery/celery/result.py", line 113, in get
interval=interval)
File "/opt/devel/celery/celery/backends/rpc.py", line 138, in wait_for
raise meta['result']
TypeError: add() takes exactly 2 arguments (1 given)
如果不希望 Celery 抛出异常,可以通过设置 propagate 来进行禁用:
>>> res.get(propagate=False)
TypeError('add() takes exactly 2 arguments (1 given)',)
在这种情况下,他可以返回引发错误的实例,需要检查任务是否执行成功还是失败,可以通过在结果实例中使用对应的方法:
>>> res.failed()
True
>>> res.successful()
False
如何知道任务是否执行失败?可以通过查看任务的 state 进行查看:
>>> res.state
'FAILURE'
一个任务只能有当前只能有一个状态,但他的执行过程可以为多个状态,一个典型的阶段是:
PENDING -> STARTED -> SUCCESS
启动状态是一种比较特殊的状态,仅在 task_track_started 启用设置或 @task(track_started=True)的情况下才会进行记录。 挂起状态实际上不是记录状态,而是未知任务ID的默认状态,可以从此实例中看到:
>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'
重试任务比较复杂,为了证明,一个任务会重试两次,任务的阶段为:
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
更多任务状态信息可以查阅用户指南中的任务:Tasks章节的State部分内容。
更多调用任务信息可以参阅调用任务:Calling Tasks。
通过上面的实例削了使用delay方法进行调用人物,有时候可能希望将任务调用的签名传递给另外一个进程或其他函数的参数,Celery提供了一种叫签名的东西。
签名通过一种方式进行封装任务调用的参数以及执行选项,便于传递给他的函数,甚至通过序列化通过网络传输。
可以将 add 使用的参数作为任务创建的签名,倒计时为 10 秒,如下所示(2,2):
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
也可以通过一个快捷的方式进行操作:
>>> add.s(2, 2)
tasks.add(2, 2)
签名实例支持调用API:这就意味着可以使用 delay 和 apply_async 方法。 但区别就在于签名实例已经指定了参数签名,该 add 任务有两个参数,需要指定两个参数的签名才能够成一个完整的签名实例:
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4
也可以创建不完整的签名来进行创建,我称之为 partials 的内容:
# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10
在这里,设置了设置了参数值为 8,它位于参数值为 2 的签名,形成了完整的 add(8,2) 签名。 也可以设置新的参值,新设置的参数会覆盖原有的参数值:
>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False) # debug is now False.
如上所述,签名支持调用API:
这些原语本身就是签名对象,可以通过任何进行组合,形成复杂的工作流。
**Note:**在下面的这些实例中,如果要运行,需要配置一个结果后端。
让我们一起来看一些例子:
组:Groups
一个 group 并行调用任务列表,返回一个特殊的结果实例,可以将结果作为一个列表进行查看,并且通过索引进去获取返回值。
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> g = group(add.s(i) for i in xrange(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
链:Chains
可以将任务链接在一起,在一个人返回后进行调用另外一个任务:
>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64
或 partial chain
>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
链也可以这样写:
>>> (add.s(4, 4) | mul.s(8))().get()
64
和弦:Chords
和弦是一个带有回调的组:
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
90
链接到其他任务的组将自动转换为和弦:
>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
90
这些原语都是签名的类型,可以根据需要进行组合,例如:
>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
有关更多工作流信息,请参阅用户指南中 Canvas 章节。
Celery 支持 AMQP 中提供的所有路由,可以将消息发送到指定的任务队列路由。
通过 task_routes 可以设置一个按名称分配的路由任务队列,将所有的内容集中存放在一个位置:
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
可以在程序是使用 queue 参数进行指定队列:
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')
可以通过设置运行职程(Worker)时指定职程(Worker)从某个队列中进行消费(celery worker -Q):
$ celery -A proj worker -Q hipri
也可以通过“,”作为分割符进行设置多个队列,例如,可以将默认队列和 hipri 队列一起通过职程(Worker)进行消费,其中默认队列 celery 由于历史原因被命名:
$ celery -A proj worker -Q hipri,celery
队列名称的顺序不分前后,职程(Worker)给予队列分配的权重是相同的。 相关路由的信息以及使用 AMQP 路由的全部功能,详情请参考路由任务:Routing Tasks。
使用 RabbitMQ(AMQP)、Redis 或 Qpid 作为中间人(Broker),可以在运行时控制和检查职程(Worker)。 例如,当前职程(Worker)正在处理的任务:
$ celery -A proj inspect active
这是通过广播消息实现的,集群中所有职程(Worker)都会所有收到远程控制发出的指令。 也可以通过 --destination 选项指定一个或多个职程(Worker)进行操作,使用“,”进行分割职程(Worker)主机列表:
$ celery -A proj inspect active --destination=celery@example.com
如果没有提供目的地,那么每个工作人员都将采取行动并回复请求。
celery inspect 命令不能修改程序,只能进行查看职程(Worker)概况以及统计信息,可以通过 help 进行查看:
$ celery -A proj inspect --help
celery control 命令可以查看实际上改变了工作在运行时的状况:
$ celery -A proj control --help
例如,可以强制职程(Worker)启用事件消息(用于监控任务以及职程(Worker)):
$ celery -A proj control enable_events
启动事件后,可以启动事件转储程序,进行查看职程(Worker)目前执行的状况:
$ celery -A proj events --dump
或者可以启动 curses 接口:
$ celery -A proj events
当监控完毕之后,可以禁用事件:
$ celery -A proj control disable_events
celery status 命令可以远程控制并且显示集群中职程(Worker)的列表:
$ celery -A proj status
可以通过查阅 监控和管理手册:Monitoring and Management Guide ,查看 Celery 有关命令以及监控信息。
内部和消息中的所有的时间和日期使用的都是 UTC 时区。 当职程(Worker)收到消息时,例如倒计时设置,会将 UTC 时间转换为本地时间。如果需要使用与系统不同的时区,可以通过 timezone进行配置:
app.conf.timezone = 'Europe/London'
默认情况下,默认的配置项没有针对吞吐量进行优化,默认的配置比较合适大量短任务和比较少的长任务。 如果需要优化吞吐量,请参考优化:Optimizing。 如果使用的中间人是 RabbitMQ,可以将换成 librabbitmq 模块(通过 C 语言实现的AMQP客户端):
$ pip install librabbitmq
现在您已经阅读完毕本文档,您已经继续阅读 用户指南。
如果您愿意,还有一个 API参考。