本文主要总结了一下Django与Celery配合执行定时任务的部署方案。

1、安装所需模块

requirements.txt内容如下:

django>=2.1.5
requests>=2.21.0
mysqlclient>=1.3.14
redis>=3.0.1
celery>=4.2.1
eventlet>=0.24.1
django-celery-beat>=1.4.0
django-celery-results>=1.0.4

执行安装: pip3 install -r requirements.txt

pip3 list | grep "kombu\|celery"查看安装完成的模块:

celery                4.2.1
django-celery-beat    1.4.0
django-celery-results 1.0.4
kombu                 4.2.2.post1

各模块作用介绍:

django => 主体框架
requests => 操作HTTP模块
mysqlclient => 操作MySQL模块
redis => 操作Redis模块(Redis用来存储Celery的Broker)
celery => 定时任务主模块
eventlet => 协程支持
django-celery-beat => Django定时任务管理模块
django-celery-results => Django定时任务执行结果管理模块

2、Celery配置

以下为Django生成的项目的简单文件结构,接下来要添加几个文件来配置Celery

<mysite>/
├── <mysite>
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── <myapp>
│   ├── admin.py
│   ├── apps.py
│   ├── __init__.py
│   ├── tests.py
│   ├── urls.py
│   ├── views.py
│   └── models.py
└── manage.py
└── requirements.txt

I、在<mysite>/<mysite>目录下添加celery.py

文件内容如下:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

app = Celery('mysite')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# force=True 很重要,没有的话不会加载setting.py中的部分配置
app.config_from_object('django.conf:settings', namespace='CELERY', force=True)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

II、修改<mysite>/<mysite>/__init__.py

文件内容如下:

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

III、修改<mysite>/<mysite>/settings.py

文件内容如下:

INSTALLED_APPS = [
    'django_celery_results',
    'django_celery_beat',
]



TIME_ZONE = 'Asia/Shanghai'

USE_I18N = True
USE_L10N = False
USE_TZ = True

CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'django-db' #使用database作为结果存储
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERYD_CONCURRENCY = 20  # 并发worker数
CELERY_TIMEZONE = 'Asia/Shanghai'  #并没有北京时区,与TIME_ZONE应该一致
CELERYD_FORCE_EXECV = True    # 非常重要,有些情况下可以防止死锁
CELERYD_MAX_TASKS_PER_CHILD = 20  # 每个worker最多执行万20个任务就会被销毁,可防止内存泄露
CELERYD_TASK_TIME_LIMIT = 60    # 单个任务的运行时间不超过此值,否则会被SIGKILL 信号杀死
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_IGNORE_RESULT = True
CELERY_CREATE_MISSING_QUEUES = True
CELERYD_TASK_SOFT_TIME_LIMIT = 600
CELERY_TASK_RESULT_EXPIRES = 600
CELERY_ENABLE_UTC = False


from kombu import Exchange, Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('matcher', Exchange('matcher'), routing_key='matcher.#'),
)
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_TASK_ROUTES = {
    'worker_matcher.tasks.runcmd': {'queue': 'matcher', 'routing_key': 'matcher.runcmd', },
    '*': {'queue': 'default', 'routing_key': 'default', },
}

IV、生成DB信息

python3 manage.py migrate

V、添加任务文件<mysite>/<myapp>/tasks.py

文件内容如下:

from celery import shared_task

@shared_task
def print_hello():
    return 'hello celery and django...'

VI、进入Django后台添加任务

Crontabs => 用来管理长期定时
Intervals => 用来管理短效定时
Periodic tasks => 用来管理定时任务的
Solar events => 用来管理事件的(正午、太阳升起、太阳落下、黎明等)
Task results => 用来查看计划任务的执行结果的

VII、添加Supervisor配置自动管理task和worker

; ================================
;  celery beat supervisor
; ================================

[program:celerybeat]
command=/usr/local/bin/celery beat -A spider --schedule /WorkDir/webroot/spider/beat.db --loglevel=INFO

directory=/WorkDir/webroot/spider
user=mider
numprocs=1
stdout_logfile=/WorkDir/log/celery_beatout.log
stderr_logfile=/WorkDir/log/celery_beaterr.log
autostart=true
autorestart=true
startsecs=10
stopasgroup=true
priority=999


; ==================================
;  celery worker supervisor
; ==================================

[program:celeryworker]
command=/usr/local/bin/celery worker -A spider --loglevel=INFO -P eventlet -E

directory=/WorkDir/webroot/spider
user=mider
numprocs=1
stdout_logfile=/WorkDir/log/celery_worker.log
stderr_logfile=/WorkDir/log/celery_worker.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs = 600
stopasgroup=true
priority=1000

3、更改Celery任务并使其生效的方法

I、修改tasks.py中的任务代码

II、使用Django后台管理界面停用计划任务

III、重启celery worker

/usr/local/bin/celery worker -A proj --loglevel=INFO -P eventlet -E

IV、使用Django后台管理界面启用计划任务

4、添加Celery任务并使其生效的方法

I、在tasks.py添加任务代码

II、使用Django后台管理界面停用计划任务

III、重启celery beat

/usr/local/bin/celery beat -A proj --loglevel=INFO -S django_celery_beat.schedulers.DatabaseScheduler

IV、使用Django后台管理界面添加新的计划任务

V、使用命令行模式操作Celery

celery -A spider -b redis://127.0.0.1:6379/1 inspect registered
celery -A spider -b redis://127.0.0.1:6379/1 call

或者

$ python3 manage.py shell
>>> from myapp.tasks import my_task
>>> eager_result = my_task.apply()

例如如下几个方法的调用:

from wechat.tasks import getdetails
eager_result = getdetails.apply()

from wechat.tasks import checkkwds
eager_result = checkkwds.apply()

from wechat.tasks import downloadarticles
eager_result = downloadarticles.apply()

from system.tasks import getselfmipproxy
eager_result = getselfmipproxy.apply()

以上就是Django与Celery部署计划任务的总结。