python定时任务schedule库用法详细讲解
作者:IT之一小佬
python中有一个轻量级的定时任务调度的库schedule,下面这篇文章主要给大家介绍了关于python定时任务schedule库用法的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
前言
schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。
如果想执行多个任务,也可以添加多个task。
首先安装schedule库:
pip install schedule
1、按时间间隔执行定时任务
示例代码1:
import schedule from datetime import datetime def task(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts) def task2(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + '666!') def func(): # 清空任务 schedule.clear() # 创建一个按3秒间隔执行任务 schedule.every(3).seconds.do(task) # 创建一个按2秒间隔执行任务 schedule.every(2).seconds.do(task2) while True: schedule.run_pending() func()
运行结果:
示例代码2:
import schedule import time def job(name): print("her name is : ", name) name = "张三" schedule.every(10).minutes.do(job, name) schedule.every().hour.do(job, name) schedule.every().day.at("10:30").do(job, name) schedule.every(5).to(10).days.do(job, name) schedule.every().monday.do(job, name) schedule.every().wednesday.at("13:15").do(job, name) while True: schedule.run_pending() time.sleep(1)
参数解释:
- 每隔十分钟执行一次任务
- 每隔一小时执行一次任务
- 每天的10:30执行一次任务
- 每隔5到10天执行一次任务
- 每周一的这个时候执行一次任务
- 每周三13:15执行一次任务
- run_pending:运行所有可以运行的任务
注意:schedule方法是串行的,也就是说,如果各个任务之间时间不冲突,那是没问题的;如果时间有冲突的话,会串行的执行命令。
2、装饰器:通过 @repeat() 装饰静态方法
示例代码:
from datetime import datetime from schedule import every, repeat, run_pending @repeat(every(3).seconds) def task(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + '-333!') @repeat(every(5).seconds) def task2(): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + "-555555!") while True: run_pending()
运行结果:
3、传递参数
示例代码:
from datetime import datetime import schedule def task(s): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + s) def task2(s): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + s) schedule.every(3).seconds.do(task, s='-333') schedule.every(5).seconds.do(task, s='-555') while True: schedule.run_pending()
运行结果:
4、使用装饰器传递参数
示例代码:
from datetime import datetime from schedule import every, repeat, run_pending @repeat(every(3).seconds, '-333') def task(s): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + s) @repeat(every(5).seconds, '-555') def task2(s): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print(ts + s) while True: run_pending()
运行结果:
5、取消定时任务
示例代码:
import schedule i = 0 def some_task(): global i i += 1 print(i) if i == 5: schedule.cancel_job(job) print('cancel job') exit(0) job = schedule.every().second.do(some_task) while True: schedule.run_pending()
运行结果:
6、在指定时间执行一次任务
示例代码:
import time import schedule def job_that_executes_once(): print('Hello') return schedule.CancelJob schedule.every().minute.at(':30').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
运行结果:
7、根据标签检索任务
示例代码:
# 检索所有任务:schedule.get_jobs() import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend') print(friends)
运行结果:
8、根据标签取消任务
示例代码:
# 取消所有任务:schedule.clear() import schedule def greet(name): print('Hello {}'.format(name)) if name == 'Cancel': schedule.clear('second-tasks') print('cancel second-tasks') schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend') schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest') while True: schedule.run_pending()
运行结果:
9、运行任务到某时间
示例代码:
import schedule from datetime import datetime, timedelta, time def job(): print('working...') schedule.every().second.until('23:59').do(job) # 今天23:59停止 schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止 schedule.every().second.until(timedelta(hours=8)).do(job) # 8小时后停止 schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止 schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止 while True: schedule.run_pending()
运行结果:
10、马上运行所有任务(主要用于测试)
示例代码:
import schedule def job(): print('working...') def job1(): print('Hello...') schedule.every().monday.at('12:40').do(job) schedule.every().tuesday.at('16:40').do(job1) schedule.run_all() schedule.run_all(delay_seconds=3) # 任务间延迟3秒
运行结果:
11、并行运行:使用 Python 内置队列实现
示例代码:
import threading import time import schedule def job1(): print("I'm running on thread %s" % threading.current_thread()) def job2(): print("I'm running on thread %s" % threading.current_thread()) def job3(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3) while True: schedule.run_pending() time.sleep(1)
运行结果:
总结
到此这篇关于python定时任务schedule库用法的文章就介绍到这了,更多相关python定时任务schedule库内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!