python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Flask定时任务

Python Flask实现定时任务的不同方法详解

作者:Tzq@2018

在 Flask 中实现定时任务,最常用的方法是使用 APScheduler 库,本文将提供一个完整的解决方案,有需要的小伙伴可以跟随小编一起学习一下

在 Flask 中实现定时任务,最常用的方法是使用 APScheduler 库(Advanced Python Scheduler)。下面我将提供一个完整的解决方案,包含代码实现、解释和生产环境注意事项。

完整实现方案

from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from datetime import datetime

app = Flask(__name__)
app.config['SCHEDULER_JOBSTORES'] = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')
}

# 创建调度器实例
scheduler = BackgroundScheduler(
    jobstores=app.config['SCHEDULER_JOBSTORES'],
    daemon=True
)

# 定义定时任务函数
def daily_report():
    """每天上午9点执行的日报任务"""
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"[日报] 执行时间: {current_time}")
    # 这里添加实际的日报生成逻辑

def every_5_minutes():
    """每5分钟执行一次的任务"""
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"[5分钟任务] 执行时间: {current_time}")
    # 这里添加实际的任务逻辑

def on_startup():
    """应用启动时添加定时任务"""
    # 添加每天9:00执行的任务
    scheduler.add_job(
        id='daily_report_job',
        func=daily_report,
        trigger='cron',
        hour=9,
        minute=0,
        replace_existing=True
    )
    
    # 添加每5分钟执行的任务
    scheduler.add_job(
        id='every_5_minutes_job',
        func=every_5_minutes,
        trigger='interval',
        minutes=5,
        replace_existing=True
    )
    
    # 添加一次性任务(应用启动后10秒执行)
    scheduler.add_job(
        id='startup_check',
        func=lambda: print("应用启动完成!"),
        trigger='date',
        run_date=datetime.now() + timedelta(seconds=10)
    )

# 启动调度器
if not scheduler.running:
    scheduler.start()
    on_startup()

@app.route('/')
def home():
    return """
    <h1>Flask 定时任务示例</h1>
    <p>已启动的定时任务:</p>
    <ul>
        <li>日报任务:每天9:00执行</li>
        <li>5分钟任务:每5分钟执行一次</li>
    </ul>
    <p>查看控制台输出获取执行详情</p>
    """

@app.route('/jobs')
def list_jobs():
    """查看当前所有定时任务"""
    jobs = scheduler.get_jobs()
    job_list = [f"ID: {job.id}, 下次执行: {job.next_run_time}" for job in jobs]
    return "
".join(job_list)

@app.route('/add_job/<minutes>')
def add_job(minutes):
    """动态添加定时任务"""
    try:
        minutes = int(minutes)
        job_id = f"dynamic_job_{minutes}min"
        
        scheduler.add_job(
            id=job_id,
            func=lambda: print(f"动态添加的任务 - 每 {minutes} 分钟执行一次"),
            trigger='interval',
            minutes=minutes,
            replace_existing=True
        )
        
        return f"成功添加每 {minutes} 分钟执行的任务!"
    except ValueError:
        return "参数错误,请输入有效的分钟数"

@app.route('/remove_job/<job_id>')
def remove_job(job_id):
    """移除定时任务"""
    if scheduler.get_job(job_id):
        scheduler.remove_job(job_id)
        return f"已移除任务: {job_id}"
    return "任务不存在"

if __name__ == '__main__':
    app.run(debug=True)

代码解释

1. 依赖安装

pip install flask apscheduler

2. 核心组件

BackgroundScheduler:后台线程调度器,不会阻塞主线程

SQLAlchemyJobStore:任务持久化存储,使用SQLite数据库

add_job():添加任务的核心方法

3. 任务类型

Cron任务(定时执行):

trigger='cron', hour=9, minute=0  # 每天9:00执行

间隔任务(周期性执行):

trigger='interval', minutes=5  # 每5分钟执行

一次性任务

trigger='date', run_date=datetime.now() + timedelta(seconds=10)

4. 任务管理

动态添加任务:通过路由 /add_job/<minutes> 添加新任务

任务列表查看:通过路由 /jobs 查看所有任务

任务删除:通过路由 /remove_job/<job_id> 删除任务

5. 持久化存储

使用SQLite数据库存储任务信息,确保服务器重启后任务不丢失:

jobstores={
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')
}

生产环境部署注意事项

1. 多进程问题解决方案

当使用Gunicorn等WSGI服务器时(多worker进程),每个worker都会启动调度器实例,导致任务重复执行。

解决方案:

# 在应用工厂函数中初始化调度器
def create_app():
    app = Flask(__name__)
    
    if os.environ.get('WERKZEUG_RUN_MAIN') == 'true' or not app.debug:
        # 确保只在主进程中初始化调度器
        scheduler = BackgroundScheduler(daemon=True)
        scheduler.start()
        # 添加任务...
    
    return app

2. 替代方案:Celery

对于分布式环境,推荐使用Celery:

# Celery配置示例
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每10分钟执行
    sender.add_periodic_task(600, task1.s(), name='每10分钟任务')
    
    # 每天午夜执行
    sender.add_periodic_task(
        crontab(hour=0, minute=0),
        task2.s(),
        name='每日午夜任务'
    )

@app.task
def task1():
    print("执行每10分钟任务")

@app.task
def task2():
    print("执行每日任务")

3. 错误处理与日志

def job_with_error_handling():
    try:
        # 业务代码
        pass
    except Exception as e:
        app.logger.error(f"任务执行失败: {str(e)}")
        # 发送警报邮件
        send_alert_email(f"任务失败: {str(e)}")
        
def send_alert_email(message):
    # 实现邮件发送逻辑
    pass

方案对比

特性APSchedulerCelery
复杂度简单中等
分布式支持有限优秀
持久化需要配置内置
多进程支持需要特殊处理原生支持
任务队列
适用场景单机/简单任务分布式/复杂任务

到此这篇关于Python Flask实现定时任务的不同方法详解的文章就介绍到这了,更多相关Flask定时任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之

您可能感兴趣的文章:
阅读全文