Celery定时任务组件之Django+Celery项目实战教程
作者:alden_ygq
这篇文章主要介绍了Celery定时任务组件之Django+Celery项目实战,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
一、项目初始化
1. 创建虚拟环境并安装依赖
# 创建虚拟环境 python3 -m venv myenv source myenv/bin/activate # 安装依赖 pip install django celery redis django-celery-beat
2. 创建 Django 项目和应用
# 创建项目 django-admin startproject task_manager cd task_manager # 创建应用 python manage.py startapp tasks
3. 配置项目(task_manager/settings.py)
INSTALLED_APPS = [
# ...
'django_celery_beat',
'django_celery_results',
'tasks',
]
# 数据库配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db' # 使用django-celery-results存储结果
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
二、Celery 集成配置
1. 创建 Celery 应用(task_manager/celery.py)
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'task_manager.settings')
app = Celery('task_manager')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
2. 初始化 Celery(task_manager/__init__.py)
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
三、Model 开发
创建任务模型(tasks/models.py)
from django.db import models
from django.utils import timezone
class ScheduledTask(models.Model):
TASK_TYPES = (
('periodic', '周期性任务'),
('one_time', '一次性任务'),
)
name = models.CharField('任务名称', max_length=100)
task_type = models.CharField('任务类型', max_length=20, choices=TASK_TYPES)
task_function = models.CharField('任务函数', max_length=200)
cron_expression = models.CharField('Cron表达式', max_length=100, blank=True, null=True)
interval_seconds = models.IntegerField('间隔秒数', blank=True, null=True)
next_run_time = models.DateTimeField('下次执行时间', blank=True, null=True)
is_active = models.BooleanField('是否激活', default=True)
created_at = models.DateTimeField('创建时间', auto_now_add=True)
updated_at = models.DateTimeField('更新时间', auto_now=True)
def __str__(self):
return self.name
class Meta:
verbose_name = '定时任务'
verbose_name_plural = '定时任务列表'
class TaskExecutionLog(models.Model):
task = models.ForeignKey(ScheduledTask, on_delete=models.CASCADE, related_name='logs')
execution_time = models.DateTimeField('执行时间', auto_now_add=True)
status = models.CharField('执行状态', max_length=20, choices=(
('success', '成功'),
('failed', '失败'),
))
result = models.TextField('执行结果', blank=True, null=True)
error_message = models.TextField('错误信息', blank=True, null=True)
def __str__(self):
return f"{self.task.name} - {self.execution_time}"
class Meta:
verbose_name = '任务执行日志'
verbose_name_plural = '任务执行日志列表'
迁移数据库
python manage.py makemigrations python manage.py migrate
四、接口开发
1. 创建序列化器(tasks/serializers.py)
from rest_framework import serializers
from .models import ScheduledTask, TaskExecutionLog
class ScheduledTaskSerializer(serializers.ModelSerializer):
class Meta:
model = ScheduledTask
fields = '__all__'
class TaskExecutionLogSerializer(serializers.ModelSerializer):
class Meta:
model = TaskExecutionLog
fields = '__all__'
2. 创建视图集(tasks/views.py)
from rest_framework import viewsets, status
from rest_framework.response import Response
from .models import ScheduledTask, TaskExecutionLog
from .serializers import ScheduledTaskSerializer, TaskExecutionLogSerializer
from celery import current_app
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
import json
class ScheduledTaskViewSet(viewsets.ModelViewSet):
queryset = ScheduledTask.objects.all()
serializer_class = ScheduledTaskSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# 创建Celery定时任务
task = serializer.save()
self._create_celery_task(task)
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
# 更新Celery定时任务
task = serializer.save()
self._update_celery_task(task)
return Response(serializer.data)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
# 删除Celery定时任务
self._delete_celery_task(instance)
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT)
def _create_celery_task(self, task):
if task.task_type == 'periodic':
# 创建间隔调度
schedule, _ = IntervalSchedule.objects.get_or_create(
every=task.interval_seconds,
period=IntervalSchedule.SECONDS,
)
PeriodicTask.objects.create(
interval=schedule,
name=task.name,
task=task.task_function,
enabled=task.is_active,
args=json.dumps([]),
kwargs=json.dumps({}),
)
elif task.task_type == 'one_time':
# 一次性任务使用ETA
pass
def _update_celery_task(self, task):
try:
periodic_task = PeriodicTask.objects.get(name=task.name)
if task.task_type == 'periodic':
schedule, _ = IntervalSchedule.objects.get_or_create(
every=task.interval_seconds,
period=IntervalSchedule.SECONDS,
)
periodic_task.interval = schedule
periodic_task.enabled = task.is_active
periodic_task.save()
except PeriodicTask.DoesNotExist:
self._create_celery_task(task)
def _delete_celery_task(self, task):
try:
periodic_task = PeriodicTask.objects.get(name=task.name)
periodic_task.delete()
except PeriodicTask.DoesNotExist:
pass
class TaskExecutionLogViewSet(viewsets.ReadOnlyModelViewSet):
queryset = TaskExecutionLog.objects.all()
serializer_class = TaskExecutionLogSerializer
3. 配置 URL(tasks/urls.py)
from django.urls import include, path
from rest_framework import routers
from .views import ScheduledTaskViewSet, TaskExecutionLogViewSet
router = routers.DefaultRouter()
router.register(r'tasks', ScheduledTaskViewSet)
router.register(r'logs', TaskExecutionLogViewSet)
urlpatterns = [
path('', include(router.urls)),
]
4. 项目 URL 配置(task_manager/urls.py)
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('tasks.urls')),
]
五、创建示例任务
定义任务函数(tasks/tasks.py)
from celery import shared_task
from .models import ScheduledTask, TaskExecutionLog
import logging
logger = logging.getLogger(__name__)
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=3, retry_kwargs={'max_retries': 3})
def sample_task(self, task_id):
try:
task = ScheduledTask.objects.get(id=task_id)
# 模拟任务执行
result = f"任务 {task.name} 执行成功,时间:{str(self.request.time_start)}"
# 记录执行日志
TaskExecutionLog.objects.create(
task=task,
status='success',
result=result
)
logger.info(f"任务执行成功: {task.name}")
return result
except Exception as e:
# 记录错误日志
task = ScheduledTask.objects.get(id=task_id) if ScheduledTask.objects.filter(id=task_id).exists() else None
if task:
TaskExecutionLog.objects.create(
task=task,
status='failed',
error_message=str(e)
)
logger.error(f"任务执行失败: {str(e)}")
raise
六、启动服务
1. 启动 Redis
redis-server
2. 启动 Celery Worker
celery -A task_manager worker --loglevel=info --pool=prefork --concurrency=4
3. 启动 Celery Beat
celery -A task_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
4. 启动 Django 开发服务器
python manage.py runserver
七、API 测试
1. 创建周期性任务
curl -X POST http://localhost:8000/api/tasks/ -d '{
"name": "示例周期性任务",
"task_type": "periodic",
"task_function": "tasks.tasks.sample_task",
"interval_seconds": 60,
"is_active": true
}' -H "Content-Type: application/json"
2. 查看任务列表
curl http://localhost:8000/api/tasks/
3. 查看执行日志
curl http://localhost:8000/api/logs/
项目结构
task_manager/ ├── task_manager/ │ ├── __init__.py │ ├── celery.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── tasks/ │ ├── migrations/ │ ├── __init__.py │ ├── admin.py │ ├── apps.py │ ├── models.py │ ├── serializers.py │ ├── tasks.py │ ├── urls.py │ └── views.py ├── manage.py └── db.sqlite3
关键特性说明
- 动态任务管理:通过 API 创建 / 更新 / 删除定时任务
- 任务执行记录:自动记录任务执行结果和状态
- 失败重试机制:任务失败时自动重试(最多 3 次)
- 多种调度方式:支持周期性任务和一次性任务
- 可视化管理:通过 Django Admin 界面管理定时任务
扩展建议
- 添加任务参数支持,允许在创建任务时传递参数
- 实现任务暂停 / 恢复功能
- 添加任务优先级队列配置
- 集成监控系统(如 Prometheus+Grafana)
- 实现任务执行结果的异步通知(邮件、短信等)
这个实现提供了一个完整的 Django+Celery 定时任务系统,支持动态管理和监控,可直接用于生产环境。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
