python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python中实现定时任务

Python中实现定时任务详解

作者:hd092336

这篇文章主要介绍了Python中实现定时任务详解的相关资料,需要的朋友可以参考下

在项目中,我们可能遇到有定时任务的需求。

今天,我跟大家分享下 Python 定时任务的实现方法。

固定时间间隔执行任务

import time
import logging

logging.basicConfig(
    level=logging.debug,
    format="%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s - %(message)s"
)


def task():
    logging.info("Task Start.")
    time.sleep(1)
    logging.info("Task Done.")

time.sleep

第一种办法是最简单又最暴力。
那就是在一个死循环中,使用线程睡眠函数 sleep()。

while True:
    task()
    time.sleep(5)

上述的方法有几个问题:

threading.Timer

既然第一种方法暴力,那么有没有比较优雅点的方法?
Python 标准库 threading 中有个 Timer 类。
它会新启动一个线程来执行定时任务,所以它是非阻塞函式。

原理:线程中预置一个finished的事件,通过finished.wait等待固定时间间隔。
超时则执行任务。如果需要取消任务,可以调用Timer.cancel来取消任务。

from threading import Timer

t = Timer(task, 5)
t.start()

优点:
不阻塞主进程,task在线程中执行

缺点:
一个Timer只能执行一次task就结束了。
如果想循环,需要改造一下task函数。

from threading import Timer

def repeat_task():
    t = Timer(5, repeat_task)
    # 开始任务的位置决定了是任务之间等待固定间隔时间
    # 还是每个任务的开始等待固定间隔时间
    t.start()
    task()

这样可以循环执行,但是仍然只能一个线程一个任务。

Q:如何跳出循环
A:加入一个标识符:

  • sleep可以用一个布尔值来控制
  • Timer可以用一个Event来控制

固定时间点执行任务

以上是用内置的方法中比较简单的实现方式。简单的功能可以实现。

前面执行的都是指定间隔时间的定时任务,那怎么执行指定时间点的任务呢?

上面的方法是可以做到的。有两种思路,

思路有了,但是对于固定时间点执行任务的场景以及后面更复杂的场景,自己实现可能就变得复杂。

下面再介绍几个进阶的定时任务的实现方式,可以适应更复杂的业务场景。

sched

第三种方式是使用标准库中sched模块。sched 是事件调度器,
它通过 scheduler 类来调度事件,从而达到定时执行任务的效果。

简单示例如下:

import sched

schedule = sched.scheduler()  # 初始化 sched 模块的 scheduler 类
schedule.enter(10, 1, task)  # 增加调度任务
schedule.run()  # 开始调度任务

scheduler 提供了两个添加调度任务的函数:

优点:

缺点:
scheduler 中的每个调度任务只会工作一次,不会无限循环被调用。如果想重复执行同一任务, 需要重复添加调度任务即可。

import sched
import threading
from functools import wraps

s = sched.scheduler()
STOP_FLAG = threading.Event()

def repeat(interval):
    def wrapper(func):
        @wraps(func)
        def inner():
            if not STOP_FLAG.is_set():
                s.enter(interval, 0, inner)
            func()
        return inner

    return wrapper


@repeat(5)
def new_task():
    return task()

new_task()
t = threading.Thread(target=s.run)
t.start()

实现原理

当然我们仅仅学会怎么用还是不够的,不能知其然而不知其所以然。
介绍了那么多方法,那么多库,但是底层的实现逻辑都是差不多的。
一个完整的定时任务系统,有三个部分:

import sched
import time
import threading
from functools import wraps

from task import task
from typing import Callable

STOP_FLAG = threading.Event()


def timeloop(func: Callable, interval: int):
    while True:
        func()
        time.sleep(interval)


def repeat_task(func: Callable, interval: int):
    # 新建任务的前后,决定了是在任务结束后等待时间,还是固定间隔
    if not STOP_FLAG.is_set():
        t = threading.Timer(interval, repeat_task, args=(func, interval))
        t.start()
    func()


def timer_schedule(func: Callable, interval: int):
    repeat_task(func, interval)


s = sched.scheduler()


def repeat(interval):
    def wrapper(func):
        @wraps(func)
        def inner():
            if not STOP_FLAG.is_set():
                s.enter(interval, 0, inner)
            func()
        return inner

    return wrapper


@repeat(5)
def new_task():
    return task()


def sched_schedule(func: Callable, interval: int):
    func()
    t = threading.Thread(target=s.run)
    t.start()
    return t


def main(schedule):
    schedule_thread = schedule(new_task, 5)
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            STOP_FLAG.set()
            schedule_thread.join()
            break


if __name__ == "__main__":
    # main(timeloop)
    # main(timer_schedule)
    main(sched_schedule)

到此这篇关于Python中实现定时任务详解的文章就介绍到这了,更多相关Python中实现定时任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文