django使用celery异步任务队列

  • Post author:
  • Post category:其他


celery官方文档:

Celery – Distributed Task Queue — Celery 5.3.0b1 documentation

Django使用celery官方文档:

First steps with Django — Celery 5.3.0b1 documentation

本次使用的是python3.9环境,以这个项目为例

GitHub – nineaiyu/xshare: 基于阿里云盘的文件分享平台

1.安装Django和celery

可以不用填写Django,安装celery的时候,会自动安装对应版本的Django

pip install celery django-celery-beat django-celery-results celery[redis]

2.配置celery

编辑Django配置setting.py添加如下内容

# https://docs.celeryq.dev/en/stable/userguide/configuration.html?
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'default'

# broker redis
DJANGO_DEFAULT_CACHES = CACHES['default']
CELERY_BROKER_URL = 'redis://:%s@%s/2' % (
    DJANGO_DEFAULT_CACHES["OPTIONS"]["PASSWORD"], DJANGO_DEFAULT_CACHES["LOCATION"].split("/")[2])

CELERY_WORKER_CONCURRENCY = 10  # worker并发数
CELERYD_FORCE_EXECV = True  # 非常重要,有些情况下可以防止死
CELERY_RESULT_EXPIRES = 3600  # 任务结果过期时间

CELERY_WORKER_DISABLE_RATE_LIMITS = True  # 任务发出后,经过一段时间还未收到acknowledge , 就将任务重新交给其他worker执行
CELERY_WORKER_PREFETCH_MULTIPLIER = 60  # celery worker 每次去redis取任务的数量

CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000  # 每个worker执行了多少任务就会死掉,我建议数量可以大一些,比如200

CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = True

# CELERY_ACCEPT_CONTENT = ['json']
# CELERY_TASK_SERIALIZER = 'json'

# celery消息的序列化方式,由于要把对象当做参数所以使用pickle,使用pickle,worker必须非root用户启动
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle']
CELERY_TASK_SERIALIZER = 'pickle'

更加详细的配置参考文档:

Configuration and defaults — Celery 5.2.7 documentation

3,创建celery实例

需要在项目和settings.py同级文件 celery.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# project : server
# filename : celery
# author : ly_13
# date : 2022/9/23

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xshare.settings')

app = Celery('xshare')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

还需要在xshare/__init__.py添加如下内容

from .celery import app as celery_app

__all__ = ('celery_app',)

4,在任意一个app下创建一个 tasks.py文件,目录结构类似如下

- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

5,创建一个定时任务,定时任务需要用到 app.task装饰器

在tasks.py文件添加如下内容

from xshare.celery import app
from datetime import datetime, timedelta
from django.utils import timezone
from api.models import ShareCode

@app.task
def auth_clean_invalid_share():
    default_timezone = timezone.get_default_timezone()
    value = timezone.make_aware(datetime.now(), default_timezone)
    deleted, _ = ShareCode.objects.filter(file_id__isnull=True, expired_time__lt=value).delete()

在settings.py 增加如下内容,意思每天凌晨3点3分执行该任务

from celery.schedules import crontab


CELERY_BEAT_SCHEDULE = {
'auth_clean_invalid_share_job': {
        'task': 'api.tasks.auth_clean_invalid_share',
        'schedule': crontab(hour=3, minute=3),
        'args': ()
    }
}

6,定义一个异步任务,需要用到shared_task

同样在tasks.py文件添加内容如下

@shared_task
def sync_drive_size(batch_queryset):
    for drive_obj in batch_queryset:
        try:
            ali_obj = get_aliyun_drive(drive_obj)
            default_drive_obj = ali_obj.get_default_drive()
            drive_obj.total_size = default_drive_obj.total_size
            drive_obj.used_size = default_drive_obj.used_size
            drive_obj.active = True
            drive_obj.save(update_fields=['total_size', 'used_size', 'active', 'updated_time'])
            logger.info(f'{drive_obj} update size success')
        except Exception as e:
            logger.warning(f'{drive_obj} update drive size failed:{e}')

使用的时候,直接导包调用,调用方式如下

sync_drive_size.apply_async(args=(batch_queryset,))

如果需要延时执行,比如延迟600秒执行,可以这样定义

def eta_second(second):
    ctime = datetime.now()
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=second)
    return utc_ctime + time_delay


task = sync_drive_size.apply_async(args=([drive_obj],), eta=eta_second(60 * 10))

获取异步执行结果,超时1秒,若在超时范围内未获取到结果,则会抛 TimeoutError异常

result = c_task.get(propagate=False, timeout=1)

如果任务状态已经完成,清理任务数据

if c_task.successful():
      c_task.forget()
from celery.exceptions import TimeoutError


c_task = sync_drive_size.apply_async(args=(batch_queryset,))
try:
	result = c_task.get(propagate=False, timeout=1)
	logger.info(f"task_id:{task_id} result:{result}")
except TimeoutError:
	logger.error(f"task_id:{task_id} result timeout.")
	result = {'task_id': task_id}
if c_task.successful():
	c_task.forget()

7,启动celery-work和beat

## 开发环境全部启动work和beat
celery -A xshare worker --uid=nginx --beat --scheduler django --loglevel=debug

## 只启动worker,配置使用 celery消息的序列化方式为pickle, 因此启动的使用,为了安全,必须指定uid
celery -A xshare worker -l INFO --uid=nginx 

## 只启动beat
celery -A xshare beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

基础操作大概就这些,更进阶操作请查看官方文档

Celery – Distributed Task Queue — Celery 5.3.0b1 documentation



版权声明:本文为ly1358152944原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。