Python Apschedule定时任务框架的用法详解
作者:一夜奈何梁山
一: 安装
pip install apscheduler
二: 基本概念
1: 触发器: 调度逻辑,描述任务何时被触发。(日期触发,时间间隔,cronjob表达式)
2: 作业存储器:指定作业存储的位置,默认是保存在内存中,
3: 执行器:将任务(函数)提交到线程池或者进程持中运行,当任务完成时,通知调度器发生相应的事件。
4:调度器:任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通
常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
三:基本案例
任务一: 每隔三秒钟执行一次。
from datetime import datetime import os # 1: 导入这个最简单的调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 2: 定义我们的job def tick(): print("Tick! The time is: %s" % datetime.now()) if __name__ == '__main__': # 3: 实例化BlockingScheduler调度器,没有参数表示存储器是:内存 # 执行器是线程池,默认的线程并发数是10个。 scheduler = BlockingScheduler() # 4:调度器绑定任务,并指定触发器 # 触发器:‘interval'表示间隔执行, ‘date', 表示指定时间触发, ‘cron'表示固定时间间隔触发。 scheduler.add_job(tick, 'interval', seconds=3) try: # 5:执行任务 scheduler.start() except Exception as e: print(e)
任务二: 每天固定时间执行:
from datetime import datetime import os # 1: 导入这个最简单的调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 2: 定义我们的job def tick(): print("Tick! The time is: %s" % datetime.now()) if __name__ == '__main__': # 3: 实例化BlockingScheduler调度器,没有参数表示存储器是:内存 # 执行器是线程池,默认的线程并发数是10个。 scheduler = BlockingScheduler() # 4:调度器绑定任务,并指定触发器 # 每天10点10分执行 # scheduler.add_job(tick, trigger='cron', hour=10, minute=10) # 每天 10点10分,11点10分,12点10分执行 scheduler.add_job(tick, trigger='cron', hour='10-12', minute=10) try: # 5:执行任务 scheduler.start() except Exception as e: print(e)
四: 调度器
1:调度器的执行原理:
循环询问作业存储器,有没有到期要执行的任务,如果有则计算运行的时间点。
交给执行器按照时间点运行。
2:调度器的分类:
1: BlockingScheduler: 调用start后会阻塞主线程。
2:BackgroundScheduler: 调用start后默认开启守护线程,不会阻塞主线程。
3:AsyncIOScheduler: 与AsyncIO配合使用
4: GeventScheduler: 与Gevent配合使用
5: TwistedScheduler: 与Twisted配合使用
6: QtScheduler: 与Qt配合使用
五: 作业存储器
内存:程序崩溃,则重启时,重新加入任务。
数据库:程序崩溃,重启时,恢复中断的状态。推荐使用:PostgreSQL
六: 执行器
1: 线程池执行器:默认
2:进程池执行器:CPU密集型
3:线程池+进程池执行器:
七:多执行器,存储器,单调度器案例:
案例: 配置两个存储器:一个mongodb,一个sqlite。配置一个线程池执行器和一个进程池执行器。
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstorage = { "mongodb": MongoDBJobStore(), "default": SQLAlchemyJobStore(url='mongodb数据库地址') } executes = { "default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5) } job_default = { # coalesce默认情况下关闭 'coalesce': False, # 作业的默认最大运行实例限制为3 "max_instances": 3 } scheduler = BackgroundScheduler(jobstores=jobstorage, executors=executes, job_defaults=job_default, timezone=utc)
八: 不同存储器执行案例
1: 内存存储
from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime def my_job(id= 'my_job'): print(id, '--->', datetime.now()) # 1: 定义任务存储器为内存,其实默认的也是这个 jobstorage = { "default": MemoryJobStore() } # 2:定义执行器, 10进程20线程执行 execytors = { "default": ThreadPoolExecutor(20), "processpoll": ProcessPoolExecutor(10) } # 3:定义任务设置 job_defaults = { 'coalesce': False, 'max_instances': 3 } # 4: 实例化调度器 scheduler = BlockingScheduler(jobstorage=jobstorage, execytors=execytors, job_defaults=job_defaults) # 5: 给调度器增加任务 # 每5分钟执行一次 scheduler.add_job(my_job, args=['job_interval'], id='job_interval', trigger='interval', seconds=5, replace_existing=True) # 截止到2021-10-25日前,每个4月到8月的每天7点到11点,每10分钟执行一次 scheduler.add_job(my_job, args=['job_cron'], id='job_cron', trigger='cron', month='4-8,11-12', hour='7-11', second='*/10', end_date='2021-10-25') # 默认的配置:立刻执行一次 scheduler.add_job(my_job, args=['job_once_now'], id='job_once_now') # 某个具体节点,执行一次 scheduler.add_job(my_job, args=['job_date_once'], id='job_date_once', trigger='date', run_date='2021-08-05 07:48:05')
数据库存储:
上面代码只需要修改存储器即可:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime def my_job(id= 'my_job'): print(id, '--->', datetime.now()) # 1: 定义任务存储器为内存,其实默认的也是这个 jobstorage = { "default": SQLAlchemyJobStore(url="数据库地址") }
数据库存储存在的问题:
1:假如程序中断了,则再次开启的时候,调度器会将数据库中没有执行的任务再次添加进来。如果我们此时再次运行程序,则优惠追加进来相同的任务。如何让他不再加进来呢?
在追加任务的时候增加配置项:replace_existing=True
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval',seconds=3,replace_existing=True)
2:如果程序错过了我们指定的时间,我们就不让他运行了则可以增加配置项:misfire_grace_time
scheduler.add_job(my_job,args = ['job_cron',] ,id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11',second='*/15',coalesce=True,misfire_grace_time=30,replace_existing=True,end_date='2018-05-30')
这里是设置如果时间相差超过30秒了,则这个任务就不执行了。
九: 调度器的其他操作
scheduler.remove_job(job_id, jobstore=None) # 删除作业 scheduler.remove_all_jobs(jobstore=None) # 删除所有作业 scheduler.pause_job(job_id, jobstore=None) # 暂停作业 scheduler.resume_job(job_id, jobstore=None) # 恢复作业 scheduler.modify_job(job_id, jobstore=None, **changes) # 修改单个作业属性信息 scheduler.reschedule_job(job_id, jobstore=None, trigger=None, **trigger_args) # 修改单个作业的触发器并更新下次运行时间 scheduler.print_jobs(jobstore=None, out=sys.stdout) # 输出作业信息
十: 调度事件监听
问题1: 如果程序出现异常,会影响整个调度任务吗?
from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print(1/0) print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5') scheduler.start()
运行这个程序会发现每5分钟报一次错。
问题2:如果程序的一个任务出现异常,其余的任务能正常执行吗?
from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print(1/0) print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def aps_test2(x): print('哈哈哈哈') scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5') scheduler.add_job(func=aps_test2, args=('定时任务2',), trigger='cron', second='*/5') scheduler.start()
运行发现,任务一报错,任务二仍然可以运行。
设置日志记录和事件监听:
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging # 1: 定义日志格式: # %(levelno)s 打印日志级别的数值 # %(levelname)s 打印日志级别名称 # %(pathname)s 打印当前执行程序的路径,其实就是sys.argv[0] # %(filename)s 打印当前执行程序名 # %(funcName)s 打印日志的当前函数 # %(lineno)d 打印日志的当前行号 # %(asctime)s 打印日志的记录时间 # %(thread)d 打印线程ID # %(threadName)s 打印线程的名称 # %(process)d 打印进程的ID # %(message)s 打印日志的信息 logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') # 正确的任务 def aps_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) # 出错的任务 def date_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) print(1 / 0) # 2: 设置监听器 def my_listener(event): if event.exception: print('任务出错了!!!!!!') else: print('任务照常运行...') scheduler = BlockingScheduler() scheduler.add_job(func=date_test, args=('一次性任务,会出错',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task') # 每3秒执行一次 scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger = logging scheduler.start()
以上就是Python Apschedule定时任务框架的用法详解的详细内容,更多关于Python Apschedule定时任务的资料请关注脚本之家其它相关文章!