测试工作中接触到进程管理工具,学习并记录
是一个使用python开发的任务调度工具,理解时可以分为两部分来理解:Broker、APP、Worker。
直接pip install -U Celery
,安装之前记得更新pip
from celery import Celery
# 创建一个celery实例,使用rabbitmq作为broker
app = Celery('demo', broker='amqp://username:passwd@ip:port/varhost')
# 创建一个celery实例,使用redis作为broker
app = Celery('demo', broker='redis://:password@localhost//')
# 创建任务函数
@app.task
def my_task():
print("任务函数正在执行....")
celery -A task worker --loglevel=info
from task import my_task
my_task.delay()
至此,一个简单的celery的小实验已经完成了,基本就是这样使用的
- app.task装饰后将add函数变成一个异步的任务,add.delay函数将任务序列化发送到rabbitmq/redis;
- 该过程创建一个名字为celery的exchange,类型为direct;同时会创建一个名为celery的queue,队列和交换机使用路由键celery绑定;
- 打开rabbitmq管理后台,可以看到有一条消息已经在celery队列中
2.当使用redis作为broker时
- 如果使用redis作为任务队列中间人,在redis中存在两个键 celery 和 _kombu.binding.celery , _kombu.binding.celery 表示有一名为 celery 的任务队列(Celery 默认),而 celery为默认队列中的任务列表
worker时实例的核心执行模块,有不同的类型,这里对worker的理解进行了详细的介绍
开启worker后就可以对调用进行实际的运行
celery -A app.celery_tasks.celery worker -Q queue --loglevel=info
- A参数指定celery对象的位置,该app.celery_tasks.celery指的是app包下面的celery_tasks.py模块的celery实例,注意一定是初始化后的实例,
- Q参数指的是该worker接收指定的队列的任务,这是为了当多个队列有不同的任务时可以独立;如果不设会接收所有的队列的任务;
- l参数指定worker的日志级别;
执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对
from celery import Celery
# 增加backend参数,同时在定义的函数中增加了返回值
app = Celery('demo',
broker='amqp://username:passwd@ip:port/varhost'
backend='redis://username:passwd@ip:6390/db',
)
# 创建任务函数
@app.task
def my_task(a, b):
print("任务函数正在执行....")
return a + b
from celery import Celery
app = Celery('demo')
# 增加配置
app.conf.update(
result_backend='url',
broker_url='url',
)
新建一个py文件celeryconfig.py用于存储配置信息
result_backend='url'
broker_url='url'
创建app时将创建的配置文件引入
from celery import Celery
import celeryconfig
app = Celery('demo')
# 从创建的配置模块中加载配置
app.config_from_object('celeryconfig')
可以是将celery任务扔给supervisor进行管理,supervisor的使用在同一专栏中有介绍,可以点击查看
项目中的使用可以看这里,涉及自动发现实例、任务调用时可以传递的参数等还需要继续学习