当前位置: 首页 > 工具软件 > Celery > 使用案例 >

celery 停止_Celery 使用 +

穆德海
2023-12-01

在项目中使用celery

项目结构:

proj/__init__.py # 注意这个文件 django是不带的默认 对应django的项目

/celery.py

/tasks.py

proj/celery.py

from __future__ import absolute_import, unicode_literals

from celery import Celery

app = Celery('proj',

broker='amqp://', # 使用的消息队列

backend='amqp://', # 使用的结果存储

include=['proj.tasks']) # 任务们

if __name__ == '__main__':

app.start()

proj/tasks.py

from __future__ import absolute_import, unicode_literals

from .celery import app

@app.task

def add(x, y):

return x + y

启动工作进程:

celery -A proj worker -l info

-------------- celery@halcyon.local v4.0 (latentcall)

---- **** -----

--- * *** * -- [Configuration]

-- * - **** --- . broker: amqp://guest@localhost:5672//

** ---------- . app: main:0x1012d8590

** ---------- . concurrency: 8 (processes)

** ---------- . events: OFF (enable -E to monitor this worker)

** ----------

*** --- * --- [Queues]

-- ******* ---- . celery: exchange:celery(direct) binding:celery

--- ***** -----

[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.

broker就是配置中的消息队列,concurrency就是工作进程数量(默认是cpu数量)如果所有进程都被占用了那么新的任务需要等待,events是指定celery是否发送监控信息,queue就是队列。

终止工作进程

直接control -c(按照上述命令在前台启动的),当然了如果在后台启动和停止:

celery multi start w1 -A proj -l info

celery multi restart w1 -A proj -l info

# 异步关闭 立即返回

celery multi stop w1 -A proj -l info

# 等待关闭操作完成

celery multi stopwait w1 -A proj -l info

默认会在当前目录下创建pid和log文件,指定路径和名称:

celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \

--logfile=/var/log/celery/%n%I.log

关于-A(--app)选项:

指定celery app,可以用 module.path:attribute的形式指定,也可直接指定package,如--app=proj,然后会搜索其中的proj.app,proj.celery等。

操作已被创建的worker时,不需要参数什么的完全一样,只要pidfile和logfile一样即可。

调用

add.delay(1, 1)

add.apply_async((1, 1))

add.apply_async((2, 2), queue='lopri', countdown=10)

# 指定要发送到哪个队列 运行时间延迟countdown

# 结果获得

res = add.delay(1, 1)

res.get()

celery默认不产生结果的原因:因为各种应用的需求不同,而大多数任务保存返回值没有什么意义,而且产生结果并不是用来监控任务和工作进程,应该是使用event消息的专门的监控模块。

res.id # 任务id uuid

# 任务出错会raise 错误 可指定propagate

res.get(propagate=False)

res.failed()

res.successful()

res.state

# PENDING -> STARTED -> SUCCESS(FAILURE)

# STARTED 需要设置 task_track_started 或在任务级别设置@task(track_started=True)

# 重试的情况也有

# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

工作流

有时想把一个任务的签名(包含了实参和任务的执行选项)发送到另一个进程或把参数发送到另一个函数。

add.signature((2, 2), countdown=10) # with args and kwargs

add.s(2, 2) # with args only

然后获得的签名就可以调用:

s1 = add.s(2, 2)

res = s1.delay()

res.get()

# 也可以部分指定参数 然后在调用时补全参数

s2 = add.s(2)

res = s2.delay(8)

res.get()

调用方式

之前一直使用的是delay(*args, **kwargs)的方式来调用,还有apply_async的调用方式,支持一些执行时的选项

Groups

group是同时调用的一系列任务,返回特殊的结果可以获得组内所有的执行结果,可以按照任务的顺序从该结果中获得对应任务的结果。

group(add.s(i, i) for i in xrange(10))().get()

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# 组也支持部分参数化

g = group(add.s(i) for i in xrange(10))

g(10).get()

[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

把任务连接起来,执行完一个后把结果传入另一个再调用。

chain(add.s(4, 4) | mul.s(8))().get()

g = chain(add.s(4) | mul.s(8))

g(4).get()

# 也支持部分参数化

chords

chord是一个带callback的group

from celery import chord

from proj.tasks import add, xsum

chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()

# 而group的结果发送到另一个任务则自动转换为chord

(group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()

# 可以把单任务的结果发送到组中

upload_doc.s(file) | group(apply_filter.s() for filter in filters)

Routing

在app层面配置task到某个队列:

app.conf.update(

task_routes = {

'proj.tasks.add': {'queue': 'hipri'},

},

)

在调用时指定使用某个队列:

add.apply_async((2, 2), queue='hipri')

启动工作进程时指定该进程处理的队列:

celery -A proj worker -Q hipri

# 指定多个处理的队列 默认队列名称:celery

celery -A proj worker -Q hipri,celery

远程控制

查看工作进程当前的工作:

celery -A proj inspect active

这个命令是一个广播,所有工作进程都会收到。(需要启用event)

celery -A proj inspect active --destination=celery@example.com

让工作进程启用event

celery -A proj control enable_events

 类似资料: