Django中如何使用Celery执行异步任务
作者:G_scsd
这篇文章主要介绍了Django中如何使用Celery执行异步任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
DJango使用Celery异步任务
1. 安装
pip install celery==4.4.7
2. 配置
2.1 setting.py
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' CELERY_RESULT_SERIALIZER = 'json'
2.2 setting同级目录
- 2.2.1新建celery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoFunction.settings') app = Celery('test_celery') # 定义全局的Celery任务 test_celery 随便写 # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # 固定配置 # Load task modules from all registered Django app configs. app.autodiscover_tasks(['CeleryFunc']) # 把需要用定时任务的APP加进去 @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
- 2.2.2 init.py
from __future__ import absolute_import, unicode_literals from .celery import app as celery_app # 重命名 __all__ = ['celery_app'] # 把重命名的APP写进去
3. Django APP 中新建tasks.py文件
import time from loguru import logger # Django环境的初始化 # 在任务处理者一段加这几句 import os import django os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoFunction.settings') django.setup() from DjangoFunction import celery_app # bind=True, 加了这个就必须加 self, 这个能打印定时任务的信息(这两个都可以不加) @celery_app.task(bind=True) def my_asynch_task(self): """ 我的 异步 任务 :return: """ logger.info('开始执行异步任务') time.sleep(3) print('------', self.request.id) logger.info('结束执行异步任务') return 10
4. views.py文件
from rest_framework.views import APIView from rest_framework.response import Response from CeleryFunc.tasks import my_asynch_task import time class AsyncAPIView(APIView): """ 数据集 """ def get(self, request, format=None): try: start_time = time.time() # 这两种方法都可以,不加后面这个方法就不是异步了 # result = my_asynch_task.apply_async() result = my_asynch_task.delay() print(dir(result)) print('result.state = ', result.state) print('result.status = ', result.status) print('result.successful = ', result.successful) print('result.result = ', result.result) print('result.name = ', result.name) resp = { 'code': 200, 'message': 'success', } end_date = time.time() print('耗时: ', end_date - start_time) except Exception as e: resp = { 'code': 300, 'message': str(e), } return Response(resp)
后台可以启动这个命令来查看异步任务是否加入和日志
DjangoFunction是项目名称
celery worker -A DjangoFunction -l info -P eventlet
项目结构
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。