celery官方文档:
Celery – Distributed Task Queue — Celery 5.3.0b1 documentation
Django使用celery官方文档:
First steps with Django — Celery 5.3.0b1 documentation
本次使用的是python3.9环境,以这个项目为例
GitHub – nineaiyu/xshare: 基于阿里云盘的文件分享平台
1.安装Django和celery
可以不用填写Django,安装celery的时候,会自动安装对应版本的Django
pip install celery django-celery-beat django-celery-results celery[redis]
2.配置celery
编辑Django配置setting.py添加如下内容
# https://docs.celeryq.dev/en/stable/userguide/configuration.html?
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'default'
# broker redis
DJANGO_DEFAULT_CACHES = CACHES['default']
CELERY_BROKER_URL = 'redis://:%s@%s/2' % (
DJANGO_DEFAULT_CACHES["OPTIONS"]["PASSWORD"], DJANGO_DEFAULT_CACHES["LOCATION"].split("/")[2])
CELERY_WORKER_CONCURRENCY = 10 # worker并发数
CELERYD_FORCE_EXECV = True # 非常重要,有些情况下可以防止死
CELERY_RESULT_EXPIRES = 3600 # 任务结果过期时间
CELERY_WORKER_DISABLE_RATE_LIMITS = True # 任务发出后,经过一段时间还未收到acknowledge , 就将任务重新交给其他worker执行
CELERY_WORKER_PREFETCH_MULTIPLIER = 60 # celery worker 每次去redis取任务的数量
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000 # 每个worker执行了多少任务就会死掉,我建议数量可以大一些,比如200
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = True
# CELERY_ACCEPT_CONTENT = ['json']
# CELERY_TASK_SERIALIZER = 'json'
# celery消息的序列化方式,由于要把对象当做参数所以使用pickle,使用pickle,worker必须非root用户启动
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle']
CELERY_TASK_SERIALIZER = 'pickle'
更加详细的配置参考文档:
Configuration and defaults — Celery 5.2.7 documentation
3,创建celery实例
需要在项目和settings.py同级文件 celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# project : server
# filename : celery
# author : ly_13
# date : 2022/9/23
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xshare.settings')
app = Celery('xshare')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
还需要在xshare/__init__.py添加如下内容
from .celery import app as celery_app
__all__ = ('celery_app',)
4,在任意一个app下创建一个 tasks.py文件,目录结构类似如下
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py
5,创建一个定时任务,定时任务需要用到 app.task装饰器
在tasks.py文件添加如下内容
from xshare.celery import app
from datetime import datetime, timedelta
from django.utils import timezone
from api.models import ShareCode
@app.task
def auth_clean_invalid_share():
default_timezone = timezone.get_default_timezone()
value = timezone.make_aware(datetime.now(), default_timezone)
deleted, _ = ShareCode.objects.filter(file_id__isnull=True, expired_time__lt=value).delete()
在settings.py 增加如下内容,意思每天凌晨3点3分执行该任务
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'auth_clean_invalid_share_job': {
'task': 'api.tasks.auth_clean_invalid_share',
'schedule': crontab(hour=3, minute=3),
'args': ()
}
}
6,定义一个异步任务,需要用到shared_task
同样在tasks.py文件添加内容如下
@shared_task
def sync_drive_size(batch_queryset):
for drive_obj in batch_queryset:
try:
ali_obj = get_aliyun_drive(drive_obj)
default_drive_obj = ali_obj.get_default_drive()
drive_obj.total_size = default_drive_obj.total_size
drive_obj.used_size = default_drive_obj.used_size
drive_obj.active = True
drive_obj.save(update_fields=['total_size', 'used_size', 'active', 'updated_time'])
logger.info(f'{drive_obj} update size success')
except Exception as e:
logger.warning(f'{drive_obj} update drive size failed:{e}')
使用的时候,直接导包调用,调用方式如下
sync_drive_size.apply_async(args=(batch_queryset,))
如果需要延时执行,比如延迟600秒执行,可以这样定义
def eta_second(second):
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=second)
return utc_ctime + time_delay
task = sync_drive_size.apply_async(args=([drive_obj],), eta=eta_second(60 * 10))
获取异步执行结果,超时1秒,若在超时范围内未获取到结果,则会抛 TimeoutError异常
result = c_task.get(propagate=False, timeout=1)
如果任务状态已经完成,清理任务数据
if c_task.successful():
c_task.forget()
from celery.exceptions import TimeoutError
c_task = sync_drive_size.apply_async(args=(batch_queryset,))
try:
result = c_task.get(propagate=False, timeout=1)
logger.info(f"task_id:{task_id} result:{result}")
except TimeoutError:
logger.error(f"task_id:{task_id} result timeout.")
result = {'task_id': task_id}
if c_task.successful():
c_task.forget()
7,启动celery-work和beat
## 开发环境全部启动work和beat
celery -A xshare worker --uid=nginx --beat --scheduler django --loglevel=debug
## 只启动worker,配置使用 celery消息的序列化方式为pickle, 因此启动的使用,为了安全,必须指定uid
celery -A xshare worker -l INFO --uid=nginx
## 只启动beat
celery -A xshare beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
基础操作大概就这些,更进阶操作请查看官方文档
Celery – Distributed Task Queue — Celery 5.3.0b1 documentation