Flask项目中Celery的使用

  • Post author:
  • Post category:其他




Flask项目中Celery的使用



一、项目目录结构

在这里插入图片描述



二、celery_config.py

  • 在config中新增文件celery_config.py
# -*- coding: utf8 -*-


# package import
from .redis_config import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD

BROKER_URL = 'redis://:{}@{}:{}/1'.format(REDIS_PASSWORD, REDIS_HOST, REDIS_PORT)
RESULT_BACKEND = 'redis://:{}@{}:{}/2'.format(REDIS_PASSWORD, REDIS_HOST, REDIS_PORT)
TASK_SERIALIZER = 'json'
RESULT_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
TIMEZONE = 'Asia/Shanghai'
ENABLE_UTC = True



三、task.py

  • 在app.py的同级目录下创建task.py文件
 -*- coding: utf8 -*-

# package import
import time
from celery import Celery
from flask import Flask
from config import celery_config

"""
新初始化一个Flask,否则会造成循环引用。
名字一定不能与app.py中的Flask应用重名
例如app.py中的Flask应用的名称为app,tasks.py文件中的Flask应用名称为application
"""
application = Flask(__name__)

application.config.from_object(celery_config)


def make_celery(app):  # 官方提供的使用方法
    celery_server = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
                    broker=app.config['BROKER_URL'])
    celery_server.conf.update(app.config)

    task_base = celery_server.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery_server.Task = ContextTask
    return celery_server


celery = make_celery(application)


# 增加一个实例任务,add()
@celery.task
def add(x, y):

    time.sleep(10)

    return x + y



四、在蓝图中使用

@user_semantic_analysis.route('/hello/celery', methods=['GET', 'POST'])
def hello_celery():
	
	# 使用add()方法
    add.delay(1, 5320)
    return 'hello celery'



五、启动celery服务

# 在终端中输入命令启动celery服务
celery -A tasks worker -l info -P eventlet  -c 10
  • celery成功调用

    在这里插入图片描述



版权声明:本文为weixin_42521409原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。