python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Django使用Celery执行异步任务

Django中如何使用Celery执行异步任务

作者:G_scsd

这篇文章主要介绍了Django中如何使用Celery执行异步任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

DJango使用Celery异步任务

1. 安装

pip install celery==4.4.7

2. 配置

2.1 setting.py

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_SERIALIZER = 'json'

2.2 setting同级目录

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoFunction.settings')

app = Celery('test_celery')  # 定义全局的Celery任务 test_celery 随便写

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')  # 固定配置

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(['CeleryFunc'])  # 把需要用定时任务的APP加进去


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app  # 重命名

__all__ = ['celery_app']  # 把重命名的APP写进去

3. Django APP 中新建tasks.py文件

import time
from loguru import logger
# Django环境的初始化
# 在任务处理者一段加这几句
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoFunction.settings')
django.setup()
from DjangoFunction import celery_app


# bind=True, 加了这个就必须加 self, 这个能打印定时任务的信息(这两个都可以不加)
@celery_app.task(bind=True)
def my_asynch_task(self):
    """
    我的 异步 任务
    :return:
    """
    logger.info('开始执行异步任务')
    time.sleep(3)
    print('------', self.request.id)
    logger.info('结束执行异步任务')
    return 10

4. views.py文件

from rest_framework.views import APIView
from rest_framework.response import Response
from CeleryFunc.tasks import my_asynch_task
import time

class AsyncAPIView(APIView):
    """
    数据集
    """
    def get(self, request, format=None):
        try:
            start_time = time.time()
            # 这两种方法都可以,不加后面这个方法就不是异步了
            # result = my_asynch_task.apply_async()  
            result = my_asynch_task.delay()
            print(dir(result))
            print('result.state = ', result.state)
            print('result.status = ', result.status)
            print('result.successful = ', result.successful)
            print('result.result = ', result.result)
            print('result.name = ', result.name)
            resp = {
                'code': 200,
                'message': 'success',
            }
            end_date = time.time()
            print('耗时: ', end_date - start_time)
        except Exception as e:
            resp = {
                'code': 300,
                'message': str(e),
            }
        return Response(resp)

后台可以启动这个命令来查看异步任务是否加入和日志

DjangoFunction是项目名称

celery worker -A DjangoFunction -l info -P eventlet

项目结构

在这里插入图片描述

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文