python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > django 定时任务

django使用定时任务django_apscheduler的实现

作者:弈鸣敲代码

定时任务无论是个人开发还是企业业务都是需要的,本文主要介绍了django使用定时任务django_apscheduler的实现,减少请求时需要用户等待的时间,感兴趣的可以了解一下

问题:当数据量过大,请求时间过长的时候,使用django的缓存可以解决部分问题,但是当缓存数据过期时重新请求又是耗时的时间,请求时需要用户等待。

解决:使用定时任务加缓存的方式,在后台定时请求数据保存到缓存中,而对外的接口只负责读取缓存的数据即可,这样就不用再等待数据的请求了

django_apscheduler的运行和django项目的运行是相互独立的,即使django项目停止,django_apscheduler也会继续运行

一、安装及配置django_apscheduler

1,使用pip安装django_apscheduler

pip install -i https://pypi.douban.com/simple django_apscheduler

2,settings.py中配置INSTALLED_APPS

在INSTALLED_APPS中添加django_apscheduler

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'corsheaders',
    'rest_framework',
    'course',
    'django_apscheduler',  # 添加
]

3,生成迁移

python manage.py makemigrationspython manage.py migrate

迁移后生成两张表

7dfb31601d1b440da3f3efabc95f21cf.png

二、安装及配置django_redis

1,使用pip安装django_redis

pip install -i https://pypi.douban.com/simple django_redis

2,在settings.py中配置CACHES

# 配置缓存保存
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://127.0.0.1:6379/1',  # 使用1号库
    }
}

三、代码

1,定时请求test_job函数,请求获取数据保存在redis缓存中

from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
try:
    scheduler = BackgroundScheduler()
    scheduler.add_jobstore(DjangoJobStore(), "default")
    @register_job(scheduler, "interval", seconds=10)
    def test_job():
        # 定时每5秒执行一次
        count = 0
        courses = Course.objects.values('name')
        for i in range(1000000):
            for course in courses:
                count += len(str(course["name"]))
        cache.set("word_count", count, 10)
        return JsonResponse({"count": count})
    register_events(scheduler)
    # 启动定时器
    scheduler.start()
except Exception as e:
    print('定时任务异常:%s' % str(e))

2,定义接口,获取保存的数据

 class WordCountView(View):
    def get(self, request):
        word_count = cache.get("word_count")
        if word_count:
            return JsonResponse({"count": word_count})

3,配置路由

path('count/', views.WordCountView.as_view())

四、测试访问

1,启动django

8aca817ca11745d98bcb9e2e93d1f080.png

 2,查看redis:TTL为过期时间,每10秒过期,因为定时任务也是10秒,所以就是每10秒自动刷新一次

6a4a872184ce4003a3baf7d4b111533a.png

 3,请求,时间只耗时5ms,正常请求在5秒左右

7a258eebe20144f69570b87932a2c1c6.png

五、常见问题

运行报错:Job identifier (course.views.test_job2) conflicts with an existing job

这是因为现有的工作id与数据库的id相同导致的,手动重要命名id即可

 到此这篇关于django使用定时任务django_apscheduler的实现的文章就介绍到这了,更多相关django 定时任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文