django+celery如何实现定时拉取阿里云rocketmq实例信息
作者:alden_ygq
这篇文章主要介绍了django+celery如何实现定时拉取阿里云rocketmq实例信息,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
一、项目初始化
1. 创建虚拟环境并安装依赖
# 创建虚拟环境 python3 -m venv env source env/bin/activate # 安装依赖 pip install django celery redis django-celery-beat aliyun-python-sdk-core-v3 aliyun-python-sdk-mq mysqlclient
2. 创建 Django 项目和应用
# 创建项目 django-admin startproject rocketmq_manager cd rocketmq_manager # 创建应用 python manage.py startapp rocketmq
3. 配置 MySQL 数据库(rocketmq_manager/settings.py)
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'rocketmq_manager', # 数据库名
'USER': 'your_username', # 用户名
'PASSWORD': 'your_password', # 密码
'HOST': 'localhost', # 主机
'PORT': '3306', # 端口
'OPTIONS': {
'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
},
}
}
4. 配置项目其他设置(rocketmq_manager/settings.py)
INSTALLED_APPS = [
# ...
'django_celery_beat',
'django_celery_results',
'rocketmq',
]
# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
# 阿里云配置(从环境变量获取)
ALIYUN_ACCESS_KEY_ID = os.environ.get('ALIYUN_ACCESS_KEY_ID')
ALIYUN_ACCESS_KEY_SECRET = os.environ.get('ALIYUN_ACCESS_KEY_SECRET')
ALIYUN_REGION_ID = os.environ.get('ALIYUN_REGION_ID', 'cn-hangzhou')
二、Celery 集成配置
1. 创建 Celery 应用(rocketmq_manager/celery.py)
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'rocketmq_manager.settings')
app = Celery('rocketmq_manager')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
2. 初始化 Celery(rocketmq_manager/__init__.py)
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
三、Model 开发
创建 RocketMQ 实例模型(rocketmq/models.py)
python
运行
from django.db import models
from django.utils import timezone
class RocketMQInstance(models.Model):
instance_id = models.CharField('实例ID', max_length=100, unique=True)
instance_name = models.CharField('实例名称', max_length=200, blank=True, null=True)
instance_type = models.CharField('实例类型', max_length=50, blank=True, null=True)
region_id = models.CharField('区域ID', max_length=50)
status = models.CharField('状态', max_length=50, blank=True, null=True)
create_time = models.DateTimeField('创建时间', blank=True, null=True)
expire_time = models.DateTimeField('过期时间', blank=True, null=True)
tags = models.JSONField('标签', blank=True, null=True)
last_updated = models.DateTimeField('最后更新时间', auto_now=True)
def __str__(self):
return f"{self.instance_name} ({self.instance_id})"
class Meta:
verbose_name = 'RocketMQ实例'
verbose_name_plural = 'RocketMQ实例列表'
indexes = [
models.Index(fields=['instance_id', 'region_id']),
]
class InstanceSyncLog(models.Model):
sync_time = models.DateTimeField('同步时间', auto_now_add=True)
instance_count = models.IntegerField('实例数量', default=0)
success = models.BooleanField('是否成功', default=True)
error_message = models.TextField('错误信息', blank=True, null=True)
execution_time = models.FloatField('执行时间(秒)', blank=True, null=True)
def __str__(self):
return f"同步记录 - {self.sync_time}"
class Meta:
verbose_name = '实例同步日志'
verbose_name_plural = '实例同步日志列表'
ordering = ['-sync_time']
迁移数据库
python manage.py makemigrations python manage.py migrate
四、定时任务代码
创建阿里云 API 客户端(rocketmq/aliyun_client.py)
import os
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkmq.model.v20190513 import DescribeInstancesRequest
import json
import time
class AliyunRocketMQClient:
def __init__(self):
self.access_key_id = os.environ.get('ALIYUN_ACCESS_KEY_ID')
self.access_key_secret = os.environ.get('ALIYUN_ACCESS_KEY_SECRET')
self.region_id = os.environ.get('ALIYUN_REGION_ID', 'cn-hangzhou')
self.client = AcsClient(self.access_key_id, self.access_key_secret, self.region_id)
def get_instances(self):
try:
request = DescribeInstancesRequest.DescribeInstancesRequest()
request.set_accept_format('json')
# 添加重试机制
max_retries = 3
for attempt in range(max_retries):
try:
response = self.client.do_action_with_exception(request)
return json.loads(response)
except (ClientException, ServerException) as e:
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2
print(f"请求失败,{wait_time}秒后重试: {str(e)}")
time.sleep(wait_time)
else:
raise
except Exception as e:
print(f"获取实例信息失败: {str(e)}")
raise
定义定时任务(rocketmq/tasks.py)
from celery import shared_task
from .models import RocketMQInstance, InstanceSyncLog
from .aliyun_client import AliyunRocketMQClient
import logging
from datetime import datetime
import time
logger = logging.getLogger(__name__)
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 3})
def sync_rocketmq_instances(self):
start_time = time.time()
try:
client = AliyunRocketMQClient()
response = client.get_instances()
# 处理响应数据
instance_list = []
if 'Data' in response and 'InstanceDoList' in response['Data']:
for item in response['Data']['InstanceDoList']:
instance = {
'instance_id': item.get('InstanceId', ''),
'instance_name': item.get('InstanceName', ''),
'instance_type': item.get('InstanceType', ''),
'region_id': item.get('RegionId', ''),
'status': item.get('InstanceStatus', ''),
'create_time': datetime.fromtimestamp(item.get('CreateTime', 0) / 1000) if item.get('CreateTime') else None,
'expire_time': datetime.fromtimestamp(item.get('ExpireTime', 0) / 1000) if item.get('ExpireTime') else None,
'tags': item.get('Tags', {})
}
instance_list.append(instance)
# 使用事务批量更新数据库
from django.db import transaction
with transaction.atomic():
# 先删除不存在的实例(可选)
# existing_ids = [item['instance_id'] for item in instance_list]
# RocketMQInstance.objects.exclude(instance_id__in=existing_ids).delete()
# 批量更新或创建实例
for instance_data in instance_list:
RocketMQInstance.objects.update_or_create(
instance_id=instance_data['instance_id'],
defaults=instance_data
)
execution_time = time.time() - start_time
# 记录同步日志
log = InstanceSyncLog.objects.create(
instance_count=len(instance_list),
success=True,
execution_time=execution_time
)
logger.info(f"成功同步 {len(instance_list)} 个RocketMQ实例,耗时: {execution_time:.2f}秒")
return f"同步完成,共 {len(instance_list)} 个实例,耗时: {execution_time:.2f}秒"
except Exception as e:
execution_time = time.time() - start_time
# 记录错误日志
InstanceSyncLog.objects.create(
success=False,
error_message=str(e),
execution_time=execution_time
)
logger.error(f"同步RocketMQ实例失败: {str(e)},耗时: {execution_time:.2f}秒")
raise
五、接口开发
1. 创建序列化器(rocketmq/serializers.py)
from rest_framework import serializers
from .models import RocketMQInstance, InstanceSyncLog
class RocketMQInstanceSerializer(serializers.ModelSerializer):
class Meta:
model = RocketMQInstance
fields = '__all__'
read_only_fields = ['last_updated']
class InstanceSyncLogSerializer(serializers.ModelSerializer):
class Meta:
model = InstanceSyncLog
fields = '__all__'
read_only_fields = ['sync_time', 'instance_count', 'success', 'error_message', 'execution_time']
2. 创建视图集(rocketmq/views.py)
from rest_framework import viewsets, status
from rest_framework.response import Response
from .models import RocketMQInstance, InstanceSyncLog
from .serializers import RocketMQInstanceSerializer, InstanceSyncLogSerializer
from .tasks import sync_rocketmq_instances
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.authentication import TokenAuthentication
class RocketMQInstanceViewSet(viewsets.ModelViewSet):
queryset = RocketMQInstance.objects.all()
serializer_class = RocketMQInstanceSerializer
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated]
@action(detail=False, methods=['post'])
def sync_now(self, request):
"""立即触发实例同步"""
task = sync_rocketmq_instances.delay()
return Response({'task_id': task.id, 'message': '同步任务已启动'}, status=status.HTTP_202_ACCEPTED)
@action(detail=False, methods=['get'])
def regions(self, request):
"""获取所有区域列表"""
regions = RocketMQInstance.objects.values_list('region_id', flat=True).distinct()
return Response(regions, status=status.HTTP_200_OK)
class InstanceSyncLogViewSet(viewsets.ReadOnlyModelViewSet):
queryset = InstanceSyncLog.objects.all().order_by('-sync_time')
serializer_class = InstanceSyncLogSerializer
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated]
3. 配置 URL(rocketmq/urls.py)
from django.urls import include, path
from rest_framework import routers
from .views import RocketMQInstanceViewSet, InstanceSyncLogViewSet
router = routers.DefaultRouter()
router.register(r'instances', RocketMQInstanceViewSet)
router.register(r'sync-logs', InstanceSyncLogViewSet)
urlpatterns = [
path('', include(router.urls)),
]
4. 项目 URL 配置(rocketmq_manager/urls.py)
from django.contrib import admin
from django.urls import path, include
from rest_framework.authtoken.views import obtain_auth_token
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('rocketmq.urls')),
path('api/token/', obtain_auth_token, name='api_token_auth'), # 获取认证令牌
]
六、配置定时任务
在settings.py中添加定时任务配置
CELERY_BEAT_SCHEDULE = {
'sync-rocketmq-instances': {
'task': 'rocketmq.tasks.sync_rocketmq_instances',
'schedule': 3600.0, # 每小时执行一次
'args': ()
},
}
七、启动服务
1. 设置环境变量
export ALIYUN_ACCESS_KEY_ID=your_access_key_id export ALIYUN_ACCESS_KEY_SECRET=your_access_key_secret export ALIYUN_REGION_ID=cn-hangzhou # 根据实际情况修改
2. 启动 Redis
redis-server
3. 启动 Celery Worker
celery -A rocketmq_manager worker --loglevel=info --pool=prefork --concurrency=4
4. 启动 Celery Beat
celery -A rocketmq_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
5. 启动 Django 开发服务器
python manage.py runserver
八、API 测试
1. 获取认证令牌
curl -X POST -d "username=your_username&password=your_password" http://localhost:8000/api/token/
2. 获取 RocketMQ 实例列表
curl -H "Authorization: Token your_token_here" http://localhost:8000/api/instances/
3. 获取同步日志
curl -H "Authorization: Token your_token_here" http://localhost:8000/api/sync-logs/
4. 手动触发同步
curl -X POST -H "Authorization: Token your_token_here" http://localhost:8000/api/instances/sync_now/
项目结构
rocketmq_manager/ ├── rocketmq_manager/ │ ├── __init__.py │ ├── celery.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── rocketmq/ │ ├── migrations/ │ ├── __init__.py │ ├── admin.py │ ├── apps.py │ ├── aliyun_client.py │ ├── models.py │ ├── serializers.py │ ├── tasks.py │ ├── urls.py │ └── views.py ├── manage.py └── db.sqlite3
关键特性说明
- MySQL 存储:使用 MySQL 数据库存储 RocketMQ 实例信息和同步日志
- 定时同步:每小时自动拉取阿里云 RocketMQ 实例信息
- 数据持久化:将实例信息存储到数据库,支持索引加速查询
- 手动触发:提供 API 接口支持手动触发同步
- 错误处理:任务失败自动重试,记录详细的同步日志和执行时间
- 权限控制:使用 Token 认证保护 API 接口
扩展建议
- 添加更多阿里云 API 调用,获取更详细的实例指标(如 TPS、消息堆积量等)
- 实现多区域支持,同时监控多个地域的 RocketMQ 实例
- 添加告警机制,当实例状态异常或同步失败时发送通知
- 集成缓存系统(如 Redis),提高接口响应速度
- 添加 API 限流功能,防止恶意请求
- 实现实例信息的导出功能,支持数据报表生成
这个实现提供了一个完整的 Django+Celery 定时拉取阿里云 RocketMQ 实例信息的解决方案,使用 MySQL 存储数据,支持权限控制和手动触发同步,可直接用于生产环境。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
