python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Django+Celery项目实战

Celery定时任务组件之Django+Celery项目实战教程

作者:alden_ygq

这篇文章主要介绍了Celery定时任务组件之Django+Celery项目实战,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

一、项目初始化

1. 创建虚拟环境并安装依赖

# 创建虚拟环境
python3 -m venv myenv
source myenv/bin/activate

# 安装依赖
pip install django celery redis django-celery-beat

2. 创建 Django 项目和应用

# 创建项目
django-admin startproject task_manager
cd task_manager

# 创建应用
python manage.py startapp tasks

3. 配置项目(task_manager/settings.py)

INSTALLED_APPS = [
    # ...
    'django_celery_beat',
    'django_celery_results',
    'tasks',
]

# 数据库配置
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': BASE_DIR / 'db.sqlite3',
    }
}

# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'  # 使用django-celery-results存储结果
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'

二、Celery 集成配置

1. 创建 Celery 应用(task_manager/celery.py)

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'task_manager.settings')

app = Celery('task_manager')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

2. 初始化 Celery(task_manager/__init__.py)

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ('celery_app',)

三、Model 开发

创建任务模型(tasks/models.py)

from django.db import models
from django.utils import timezone

class ScheduledTask(models.Model):
    TASK_TYPES = (
        ('periodic', '周期性任务'),
        ('one_time', '一次性任务'),
    )
    
    name = models.CharField('任务名称', max_length=100)
    task_type = models.CharField('任务类型', max_length=20, choices=TASK_TYPES)
    task_function = models.CharField('任务函数', max_length=200)
    cron_expression = models.CharField('Cron表达式', max_length=100, blank=True, null=True)
    interval_seconds = models.IntegerField('间隔秒数', blank=True, null=True)
    next_run_time = models.DateTimeField('下次执行时间', blank=True, null=True)
    is_active = models.BooleanField('是否激活', default=True)
    created_at = models.DateTimeField('创建时间', auto_now_add=True)
    updated_at = models.DateTimeField('更新时间', auto_now=True)
    
    def __str__(self):
        return self.name
    
    class Meta:
        verbose_name = '定时任务'
        verbose_name_plural = '定时任务列表'

class TaskExecutionLog(models.Model):
    task = models.ForeignKey(ScheduledTask, on_delete=models.CASCADE, related_name='logs')
    execution_time = models.DateTimeField('执行时间', auto_now_add=True)
    status = models.CharField('执行状态', max_length=20, choices=(
        ('success', '成功'),
        ('failed', '失败'),
    ))
    result = models.TextField('执行结果', blank=True, null=True)
    error_message = models.TextField('错误信息', blank=True, null=True)
    
    def __str__(self):
        return f"{self.task.name} - {self.execution_time}"
    
    class Meta:
        verbose_name = '任务执行日志'
        verbose_name_plural = '任务执行日志列表'

迁移数据库

python manage.py makemigrations
python manage.py migrate

四、接口开发

1. 创建序列化器(tasks/serializers.py)

from rest_framework import serializers
from .models import ScheduledTask, TaskExecutionLog

class ScheduledTaskSerializer(serializers.ModelSerializer):
    class Meta:
        model = ScheduledTask
        fields = '__all__'

class TaskExecutionLogSerializer(serializers.ModelSerializer):
    class Meta:
        model = TaskExecutionLog
        fields = '__all__'

2. 创建视图集(tasks/views.py)

from rest_framework import viewsets, status
from rest_framework.response import Response
from .models import ScheduledTask, TaskExecutionLog
from .serializers import ScheduledTaskSerializer, TaskExecutionLogSerializer
from celery import current_app
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
import json

class ScheduledTaskViewSet(viewsets.ModelViewSet):
    queryset = ScheduledTask.objects.all()
    serializer_class = ScheduledTaskSerializer
    
    def create(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        
        # 创建Celery定时任务
        task = serializer.save()
        self._create_celery_task(task)
        
        headers = self.get_success_headers(serializer.data)
        return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
    
    def update(self, request, *args, **kwargs):
        partial = kwargs.pop('partial', False)
        instance = self.get_object()
        serializer = self.get_serializer(instance, data=request.data, partial=partial)
        serializer.is_valid(raise_exception=True)
        
        # 更新Celery定时任务
        task = serializer.save()
        self._update_celery_task(task)
        
        return Response(serializer.data)
    
    def destroy(self, request, *args, **kwargs):
        instance = self.get_object()
        
        # 删除Celery定时任务
        self._delete_celery_task(instance)
        
        self.perform_destroy(instance)
        return Response(status=status.HTTP_204_NO_CONTENT)
    
    def _create_celery_task(self, task):
        if task.task_type == 'periodic':
            # 创建间隔调度
            schedule, _ = IntervalSchedule.objects.get_or_create(
                every=task.interval_seconds,
                period=IntervalSchedule.SECONDS,
            )
            PeriodicTask.objects.create(
                interval=schedule,
                name=task.name,
                task=task.task_function,
                enabled=task.is_active,
                args=json.dumps([]),
                kwargs=json.dumps({}),
            )
        elif task.task_type == 'one_time':
            # 一次性任务使用ETA
            pass
    
    def _update_celery_task(self, task):
        try:
            periodic_task = PeriodicTask.objects.get(name=task.name)
            if task.task_type == 'periodic':
                schedule, _ = IntervalSchedule.objects.get_or_create(
                    every=task.interval_seconds,
                    period=IntervalSchedule.SECONDS,
                )
                periodic_task.interval = schedule
            periodic_task.enabled = task.is_active
            periodic_task.save()
        except PeriodicTask.DoesNotExist:
            self._create_celery_task(task)
    
    def _delete_celery_task(self, task):
        try:
            periodic_task = PeriodicTask.objects.get(name=task.name)
            periodic_task.delete()
        except PeriodicTask.DoesNotExist:
            pass

class TaskExecutionLogViewSet(viewsets.ReadOnlyModelViewSet):
    queryset = TaskExecutionLog.objects.all()
    serializer_class = TaskExecutionLogSerializer

3. 配置 URL(tasks/urls.py)

from django.urls import include, path
from rest_framework import routers
from .views import ScheduledTaskViewSet, TaskExecutionLogViewSet

router = routers.DefaultRouter()
router.register(r'tasks', ScheduledTaskViewSet)
router.register(r'logs', TaskExecutionLogViewSet)

urlpatterns = [
    path('', include(router.urls)),
]

4. 项目 URL 配置(task_manager/urls.py)

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include('tasks.urls')),
]

五、创建示例任务

定义任务函数(tasks/tasks.py)

from celery import shared_task
from .models import ScheduledTask, TaskExecutionLog
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=3, retry_kwargs={'max_retries': 3})
def sample_task(self, task_id):
    try:
        task = ScheduledTask.objects.get(id=task_id)
        
        # 模拟任务执行
        result = f"任务 {task.name} 执行成功,时间:{str(self.request.time_start)}"
        
        # 记录执行日志
        TaskExecutionLog.objects.create(
            task=task,
            status='success',
            result=result
        )
        
        logger.info(f"任务执行成功: {task.name}")
        return result
        
    except Exception as e:
        # 记录错误日志
        task = ScheduledTask.objects.get(id=task_id) if ScheduledTask.objects.filter(id=task_id).exists() else None
        if task:
            TaskExecutionLog.objects.create(
                task=task,
                status='failed',
                error_message=str(e)
            )
        logger.error(f"任务执行失败: {str(e)}")
        raise

六、启动服务

1. 启动 Redis

redis-server

2. 启动 Celery Worker

celery -A task_manager worker --loglevel=info --pool=prefork --concurrency=4

3. 启动 Celery Beat

celery -A task_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler

4. 启动 Django 开发服务器

python manage.py runserver

七、API 测试

1. 创建周期性任务

curl -X POST http://localhost:8000/api/tasks/ -d '{
    "name": "示例周期性任务",
    "task_type": "periodic",
    "task_function": "tasks.tasks.sample_task",
    "interval_seconds": 60,
    "is_active": true
}' -H "Content-Type: application/json"

2. 查看任务列表

curl http://localhost:8000/api/tasks/

3. 查看执行日志

curl http://localhost:8000/api/logs/

项目结构

task_manager/
├── task_manager/
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── tasks/
│   ├── migrations/
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── models.py
│   ├── serializers.py
│   ├── tasks.py
│   ├── urls.py
│   └── views.py
├── manage.py
└── db.sqlite3

关键特性说明

扩展建议

这个实现提供了一个完整的 Django+Celery 定时任务系统,支持动态管理和监控,可直接用于生产环境。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文