Python 4种实现定时任务的方案
作者:迟业
1.利用 while True: + sleep() 实现定时任务
位于 time 模块中的 sleep(secs)
函数,可以实现令当前执行的线程暂停 secs
秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。
基于这样的特性我们可以通过 while 死循环+sleep() 的方式实现简单的定时任务。
代码示例:
import datetime import time def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) def loop_monitor(): while True: time_printer() time.sleep(5) # 暂停 5 秒 if __name__ == "__main__": loop_monitor()
主要缺点:
- 只能设定间隔,不能指定具体的时间,比如每天早上 8:00
sleep
是一个阻塞函数,也就是说sleep
这一段时间,程序什么也不能操作。
2.使用 Timeloop 库运行定时任务
Timeloop[2]
是一个库,可用于运行多周期任务。这是一个简单的库,它使用 decorator
模式在线程中运行标记函数。
示例代码:
import time from timeloop import Timeloop from datetime import timedelta tl = Timeloop() @tl.job(interval=timedelta(seconds=2)) def sample_job_every_2s(): print "2s job current time : {}".format(time.ctime()) @tl.job(interval=timedelta(seconds=5)) def sample_job_every_5s(): print "5s job current time : {}".format(time.ctime()) @tl.job(interval=timedelta(seconds=10)) def sample_job_every_10s(): print "10s job current time : {}".format(time.ctime())
3.利用 threading.Timer 实现定时任务
threading
模块中的 Timer
是一个非阻塞函数,比 sleep
稍好一点,timer
最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。
Timer(interval, function, args=[ ], kwargs={ })
interval
: 指定的时间function
: 要执行的方法args/kwargs
: 方法的参数
代码示例:
import datetime from threading import Timer def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) loop_monitor() def loop_monitor(): t = Timer(5, time_printer) t.start() if __name__ == "__main__": loop_monitor()
备注:Timer 只能执行一次,这里需要循环调用,否则只能执行一次
4.利用内置模块 sched 实现定时任务
sched
模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。
class sched.scheduler(timefunc, delayfunc)
这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc
是一个没有参数的返回时间类型数字的函数(常用使用的如 time 模块里面的 time),delayfunc
应该是一个需要一个参数来调用、与 timefunc
的输出兼容、并且作用为延迟多个时间单位的函数(常用的如 time 模块的 sleep)。
代码示例:
import datetime import time import sched def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) loop_monitor() def loop_monitor(): s = sched.scheduler(time.time, time.sleep) # 生成调度器 s.enter(5, 1, time_printer, ()) s.run() if __name__ == "__main__": loop_monitor()
scheduler 对象主要方法:
enter(delay, priority, action, argument)
,安排一个事件来延迟 delay 个时间单位。cancel(event):
从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError
。run():
运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()
函数),然后执行事件,直到不再有预定的事件。
个人点评:比 threading.Timer 更好,不需要循环调用。
到此这篇关于Python 4种实现定时任务的方案的文章就介绍到这了,更多相关Python 实现定时任务方案内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!