Django中如何使用Celery执行异步任务
作者:G_scsd
这篇文章主要介绍了Django中如何使用Celery执行异步任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
DJango使用Celery异步任务
1. 安装
pip install celery==4.4.7
2. 配置
2.1 setting.py
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' CELERY_RESULT_SERIALIZER = 'json'
2.2 setting同级目录
- 2.2.1新建celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoFunction.settings')
app = Celery('test_celery') # 定义全局的Celery任务 test_celery 随便写
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY') # 固定配置
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(['CeleryFunc']) # 把需要用定时任务的APP加进去
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
- 2.2.2 init.py
from __future__ import absolute_import, unicode_literals from .celery import app as celery_app # 重命名 __all__ = ['celery_app'] # 把重命名的APP写进去
3. Django APP 中新建tasks.py文件
import time
from loguru import logger
# Django环境的初始化
# 在任务处理者一段加这几句
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoFunction.settings')
django.setup()
from DjangoFunction import celery_app
# bind=True, 加了这个就必须加 self, 这个能打印定时任务的信息(这两个都可以不加)
@celery_app.task(bind=True)
def my_asynch_task(self):
"""
我的 异步 任务
:return:
"""
logger.info('开始执行异步任务')
time.sleep(3)
print('------', self.request.id)
logger.info('结束执行异步任务')
return 10
4. views.py文件
from rest_framework.views import APIView
from rest_framework.response import Response
from CeleryFunc.tasks import my_asynch_task
import time
class AsyncAPIView(APIView):
"""
数据集
"""
def get(self, request, format=None):
try:
start_time = time.time()
# 这两种方法都可以,不加后面这个方法就不是异步了
# result = my_asynch_task.apply_async()
result = my_asynch_task.delay()
print(dir(result))
print('result.state = ', result.state)
print('result.status = ', result.status)
print('result.successful = ', result.successful)
print('result.result = ', result.result)
print('result.name = ', result.name)
resp = {
'code': 200,
'message': 'success',
}
end_date = time.time()
print('耗时: ', end_date - start_time)
except Exception as e:
resp = {
'code': 300,
'message': str(e),
}
return Response(resp)
后台可以启动这个命令来查看异步任务是否加入和日志
DjangoFunction是项目名称
celery worker -A DjangoFunction -l info -P eventlet
项目结构

总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
