消息队列配置文档:
使用 MongoDB数据库:
1. 安装 MongoDB.
2. easy_install pymongo
3. easy_install celery
4. easy_install django celery
5. settings.py 配置
增加 app INSTALLED_APPS = ('djcelery','tasks',)
# Using the database to store task state and results.
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
#: Only add pickle to this list if your broker is secured
#: from unwanted access (see userguide/security.html)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# CELERY_TIMEZONE = 'Asia/Shanghai'
# from celery.schedules import crontab
# CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 A.M
# 'add-every-monday-morning': {
# 'task': 'tasks.add',
# 'schedule': crontab(hour=7, minute=30, day_of_week=1),
# 'args': (16, 16),
# },
# }
# mongodb 使用以下配置文件
BROKER_URL = 'mongodb://127.0.0.1:27017'
CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'daibang',
'taskmeta_collection': 'my_taskmeta_collection',
}
6. celery.py 内容:
#!/usr/bin/python
#coding=utf-8
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'wei.settings')
app = Celery('tasks.tasks')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
7. 在任意app目录下 建tasks.py 文件,构建自己的函数:
例:
#!/usr/bin/python
#coding=utf-8
import logging
import traceback
from set_environ import set_environ
set_environ()
# from __future__ import absolute_import
from celery import shared_task
from wei.celery import app
from django.shortcuts import get_object_or_404
# @app.task(ignore_result=True)
# @app.task(bind=True)
@app.task
def test_task():
# 插入一条记录 后发送邮件给相关人
tuser = User.objects.get(id=1)
msg = MessageSendRecord(
user=tuser,
loan_name='张三',
repay_type='repayment',
repay_amount='1',
is_send=False,
loan_url='http://www.test.com',
)
msg.save()
try:
mail_list = [tuser.email, ]
send_mail(
subject='测试邮件',
message='邮件内容',
from_email=MESSAGE_EMAIL_HOST_USER, # 发件邮箱
recipient_list=mail_list,
fail_silently=False,
auth_user=MESSAGE_EMAIL_HOST_USER, # SMTP服务器的认证用户名
auth_password=MESSAGE_EMAIL_HOST_PASSWORD, # SMTP服务器的认证用户密码
connection=None
)
msg.is_send = True
msg.save()
except Exception:
# 发送失败 记录日志
return False
if __name__ == '__main__':
result = test_task.delay() # 加入队列中
print 'OK'
8. MongoDB数据库 Collections(General) - messages 表中查看队列数据
9. 开启服务 python manage.py celery -A wei worker -l info
详细配置请详见:http://docs.celeryproject.org/en/latest/getting-started/introduction.html
如果没有报错,程序已经在执行,查看数据库表MessageSendRecord 是否发生变化。
使用 mysql数据库:
执行上面2、3、4 步骤
5. settings.py 配置
增加 app INSTALLED_APPS = (
'djcelery',
'tasks',
'kombu.transport.django', # mysql 增加此项
)
# Using the database to store task state and results.
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
mysql 使用以下配置文件
BROKER_URL = 'django://'
CELERY_RESULT_BACKEND = 'db+mysql://daybang:ocean@127.0.0.1:3306/wei_test'
增加:
easy_install sqlalchemy # mysql 数据库需要安装此项
python manage.py migrate kombu.transport.django # 创建 消息队列所需表
python manage.py migrate djcelery # 创建 消息队列所需表
6、7、8、9 与上面相同。