python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python schedule定时任务

Python schedule库实现定时任务的避坑指南

作者:竹林818

想把 schedule 用在一个需要长期稳定运行的生产环境里,远不是几行示例代码那么简单,这篇文章主要介绍了Python schedule库实现定时任务的踩坑和填坑全过程,感兴趣的小伙伴可以了解下

背景:一个不能停的同步服务

上个月,团队里一个老旧的内部数据同步脚本挂了,导致下游好几个报表数据延迟。领导把重构的任务交给了我,核心要求就一个:要稳定,要能7x24小时跑,不能动不动就悄无声息地停了

这个脚本原本就是写在一个死循环里,用 time.sleep(300) 来每隔5分钟同步一次数据。问题很明显:脚本本身没有监控,挂了没人知道;sleep 的时候整个进程啥也干不了;日志全混在一起,出问题了很难查。

我的第一反应就是找个成熟的定时任务库。APScheduler 功能强大但略显臃肿,Celery 又得搭消息队列。对于这个简单的、单进程的同步需求,我相中了轻量级的 schedule 库。它API简单直观,schedule.every(5).minutes.do(job) 这种写法,一看就懂。

但很快我就发现,想把 schedule 用在一个需要长期稳定运行的生产环境里,远不是几行示例代码那么简单。下面就是我一路踩坑、填坑的全过程。

问题分析:为什么简单的循环不靠谱?

一开始,我照着官方示例,写了一个最基础的版本:

import schedule
import time

def sync_data():
    print("开始同步数据...")
    # 模拟一些工作
    time.sleep(10)
    print("数据同步完成!")

# 每5分钟执行一次
schedule.every(5).minutes.do(sync_data)

while True:
    schedule.run_pending()
    time.sleep(1)

跑起来一看,好像没问题。但我马上意识到两个致命伤:

我需要的是一个非阻塞的、健壮的定时任务执行器。这意味着任务必须在独立的线程中运行,并且任何异常都不能导致主程序退出。

核心实现:构建健壮的定时任务骨架

第一步:把任务丢进线程里跑

解决阻塞问题的核心思路是:让 schedule.run_pending() 这个调度循环永远保持流畅运行,不被具体任务阻塞。所以,每个任务都应该在一个独立的线程中执行。

我创建了一个包装函数 run_threaded

import threading
import schedule
import time
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def run_threaded(job_func):
    """将任务函数放入线程中执行"""
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
    # 注意:这里我们不调用 job_thread.join()

这里有个关键细节:我们只 start() 线程,不 join()join() 会阻塞当前线程(也就是调度线程),等待工作线程结束,这又回到了老路上。我们让工作线程自己“放飞”,调度线程继续它的循环。

然后这样使用:

schedule.every(5).minutes.do(run_threaded, sync_data)

第二步:给任务加上“安全气囊”

任务在线程里跑,线程里的异常默认是不会传递到主线程的。这避免了脚本崩溃,但坏处是:任务悄无声息地失败了,我们可能毫无察觉。

所以,必须在任务函数内部做好异常捕获,并进行妥善的日志记录。我改造了任务函数:

def sync_data():
    """数据同步任务"""
    try:
        logger.info("数据同步任务开始。")
        # 这里是你的核心业务逻辑,比如调用API、读写数据库等
        time.sleep(10)  # 模拟耗时操作
        # 模拟一个可能的错误
        # 1 / 0
        logger.info("数据同步任务成功完成。")
    except Exception as e:
        # 捕获所有异常,记录错误日志,避免线程异常导致整个程序退出
        logger.error(f"数据同步任务发生严重错误: {e}", exc_info=True)

注意这个细节logger.error 中使用了 exc_info=True,这会让日志记录完整的异常堆栈信息,对于后期调试至关重要。现在,无论任务里发生什么,错误都会被记录在案,而调度器会继续淡定地运行下一个周期。

第三步:让调度循环本身也坚不可摧

即使每个任务都包好了,调度主循环 while True 里如果出问题(虽然概率极低),脚本还是会挂。所以,最外层的循环也要加保护:

def run_scheduler():
    """运行调度器的循环"""
    logger.info("定时任务调度器开始运行...")
    while True:
        try:
            schedule.run_pending()
            time.sleep(1)  # 避免CPU空转
        except KeyboardInterrupt:
            logger.info("接收到中断信号,调度器正在停止...")
            break
        except Exception as e:
            # 捕获调度器本身的未知异常,记录并继续运行
            logger.critical(f"调度器主循环发生未知异常: {e}", exc_info=True)
            time.sleep(60)  # 发生严重错误时,等待一分钟再重试循环

这里做了两件事:

第四步:管理多个任务与资源竞争

服务不止一个同步任务,还有清理临时文件、发送心跳检测等。当多个任务可能操作同一资源(比如同一个文件、同一个数据库表)时,就需要引入锁(threading.Lock)来避免竞争。

# 创建一个全局锁,用于保护共享资源
data_file_lock = threading.Lock()

def sync_data():
    try:
        logger.info("数据同步任务开始。")
        # 在操作共享文件前获取锁
        with data_file_lock:
            logger.info("获取到文件锁,开始写入数据...")
            time.sleep(5)  # 模拟文件操作
            logger.info("数据写入完成,释放文件锁。")
        logger.info("数据同步任务成功完成。")
    except Exception as e:
        logger.error(f"数据同步任务发生错误: {e}", exc_info=True)

这里有个坑:锁的使用要非常小心,只在必要的最小代码块上加锁,并且确保锁一定能被释放(使用 with 语句是很好的习惯),否则会导致死锁,其他任务会永远等待下去。

完整代码

下面是我最终用于生产环境的完整示例代码,你可以直接复制,根据你的任务修改 sync_datacleanup_temp 等函数的内容。

"""
一个健壮的、基于 schedule 库的 Python 定时任务执行器。
适用于需要 7x24 小时运行的后台服务。
"""
import schedule
import time
import threading
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

# 示例:用于保护共享资源的锁
shared_resource_lock = threading.Lock()

def run_threaded(job_func):
    """
    将任务函数放入独立线程中执行,避免阻塞主调度循环。
    Args:
        job_func: 要执行的任务函数
    """
    job_thread = threading.Thread(target=job_func, name=job_func.__name__)
    job_thread.start()
    # 重要:不在此处join,防止阻塞调度器。

def sync_data():
    """示例任务1:模拟数据同步"""
    task_name = "数据同步"
    try:
        logger.info(f"[{task_name}] 任务开始。")
        # 模拟一些网络I/O或数据库操作
        time.sleep(12)  # 假设本次执行需要12秒
        # 模拟操作共享资源
        with shared_resource_lock:
            logger.info(f"[{task_name}] 获取锁,操作共享资源...")
            time.sleep(1)
        # 模拟一个偶然的错误(取消注释来测试)
        # if datetime.now().second % 10 == 0:
        #     raise ConnectionError("模拟网络突然断开")
        logger.info(f"[{task_name}] 任务成功完成。")
    except Exception as e:
        logger.error(f"[{task_name}] 执行失败: {e}", exc_info=True)

def cleanup_temp():
    """示例任务2:模拟清理临时文件"""
    task_name = "清理临时文件"
    try:
        logger.info(f"[{task_name}] 任务开始。")
        time.sleep(3)  # 模拟清理工作
        logger.info(f"[{task_name}] 任务完成。")
    except Exception as e:
        logger.error(f"[{task_name}] 执行失败: {e}", exc_info=True)

def send_heartbeat():
    """示例任务3:发送心跳信号,证明调度器还活着"""
    try:
        # 这是一个非常快的任务,不需要放线程也行,这里为了统一也放了。
        logger.info("心跳检测:调度器运行正常。")
    except Exception as e:
        logger.error(f"心跳任务失败: {e}", exc_info=True)

def run_scheduler():
    """调度器主循环"""
    logger.info("=== 定时任务调度器启动 ===")

    # 1. 定义任务调度规则
    # 每5分钟执行一次数据同步(使用线程)
    schedule.every(5).minutes.do(run_threaded, sync_data)
    # 每小时执行一次清理
    schedule.every().hour.do(run_threaded, cleanup_temp)
    # 每30秒发送一次心跳
    schedule.every(30).seconds.do(run_threaded, send_heartbeat)

    # 2. 启动后立即运行一次某些任务(可选)
    logger.info("启动后立即执行一次清理和心跳...")
    run_threaded(cleanup_temp)
    run_threaded(send_heartbeat)

    # 3. 坚不可摧的主循环
    while True:
        try:
            schedule.run_pending()
            # 睡眠1秒,平衡CPU使用率和调度精度
            time.sleep(1)
        except KeyboardInterrupt:
            logger.info("接收到键盘中断信号,正在优雅停止调度器...")
            break
        except Exception as e:
            # 兜底:捕获调度循环自身的任何异常,记录并尝试恢复
            logger.critical(f"调度器主循环发生未预期异常,将在一分钟后重试: {e}", exc_info=True)
            time.sleep(60)

    logger.info("=== 定时任务调度器已停止 ===")

if __name__ == "__main__":
    run_scheduler()

踩坑记录

坑:任务执行时间过长导致调度延迟

坑:任务异常导致整个程序退出

坑:日志混乱,分不清是哪个任务的输出

坑:多个任务同时操作一个文件,内容错乱

小结

经过这一轮折腾,我把一个简单的 schedule 循环,加固成了一个适合生产环境的定时任务骨架。核心收获是:在自动化脚本中,任何可能出错的地方最终都会出错,必须用线程隔离任务,用异常捕获兜底,用清晰日志留痕。这个方案目前已经稳定运行了三周,没有再出现无故退出的情况。如果你有更复杂的分布式需求,这个单机方案可能不够,那时可以考虑 APScheduler 的持久化功能或者 Celery 这样的分布式任务队列了。

到此这篇关于Python schedule库实现定时任务的避坑指南的文章就介绍到这了,更多相关Python schedule定时任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文