python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python超时处理方法

Python中六大超时处理方法介绍

作者:孤寒者

在 Python 中,超时处理是一项非常常见的需求,尤其是在处理阻塞任务或并发任务时,本文整理了大家常用的六大超时处理方法,感兴趣的小伙伴可以参考下

Python 中常见的超时处理方法

在 Python 中,超时处理是一项非常常见的需求,尤其是在处理阻塞任务或并发任务时。以下是几种常用的超时处理方法,每种方法适用于不同的场景。

1. signal 模块

适用场景:当需要对单个阻塞任务进行超时控制时,尤其是 CPU 密集型任务。通常用于限制一个长期运行的操作(例如复杂的计算任务)。

优点:

缺点:

适用场景:

适合在 UNIX-like 系统上对简单的阻塞任务进行超时限制,如网络请求、文件读取等。

示例:

import signal


def handler(signum, frame):
    """
    定义信号处理函数 handler,当接收到一个信号时,这个函数会被调用。在函数内部,抛出 TimeoutError 异常,表示操作超时。

    signum:表示接收到的信号编号(在本例中是 SIGALRM 信号)。
    frame:是当前的堆栈帧信息,我们在这个示例中并不使用它。
    当 handler 被调用时,它会抛出一个 TimeoutError 异常,表示任务超时。
    """
    raise TimeoutError("操作超时")


signal.signal(signal.SIGALRM, handler)
# 通过 signal.signal() 函数将 SIGALRM 信号与 handler 关联。即:当程序接收到 SIGALRM 信号时,会执行 handler 函数。
# SIGALRM 是一个定时信号,通常用于指定某个操作的超时时间。在这个例子中,下面我会设置一个定时器,让它在 5 秒后发送 SIGALRM 信号。
signal.alarm(5)
# 设置一个定时器,定时器将在 5 秒后向当前进程发出一个 SIGALRM 信号。

try:
    while True:  # 模拟长时间运行的任务
        pass
except TimeoutError:
    print("操作超时")
finally:
    signal.alarm(0)  # 取消定时器,防止程序结束后定时器仍然生效。

设置信号处理器:signal.signal(signal.SIGALRM, handler) 让程序在接收到 SIGALRM 信号时执行 handler 函数。

启动定时器:signal.alarm(5) 启动一个定时器,在 5 秒后发送 SIGALRM 信号。

模拟长时间任务:while True: pass 模拟一个无限循环,表示一个长时间运行的任务。

触发超时:在 5 秒后,SIGALRM 信号会被发送,handler 函数会被调用,抛出 TimeoutError 异常。

捕获超时异常:except TimeoutError: 捕获超时异常,并打印 “操作超时”。

取消定时器:finally: signal.alarm(0) 取消定时器,避免定时器在任务完成后继续触发。

注意:在 Windows 上会报错 AttributeError: module 'signal' has no attribute 'SIGALRM'。此方法仅适用于 UNIX-like 系统。

2. threading 模块

适用场景:适合并发执行独立任务的场景,特别是当每个任务的执行时间需要控制时。适用于多线程并发处理任务。

优点:

缺点:

适用场景:

适合任务之间相对独立,且需要并发执行的场景,如并行任务、爬虫等。

示例:

通过创建两个线程来同时执行一个长时间运行的任务和超时处理功能~

import threading
import time


def task():
    time.sleep(10)  # 模拟长时间任务
    return "任务完成"


def timeout_fun():
    print("任务超时")


timeout_time = 5
t1 = threading.Thread(target=task)
t2 = threading.Timer(timeout_time, timeout_fun)
# 创建一个定时器 t2,它会在 5 秒后执行 timeout_fun() 函数。如果任务在 5 秒内未完成,timeout_fun 就会被调用,打印超时信息。

t1.start()
t2.start()

t1.join()
# 会让主线程等待 t1 线程完成执行。如果 t1 在 10 秒内完成,主线程才会继续执行下一步。如果 t1 超过了 10 秒,主线程会继续等待。
t2.cancel()  
# 取消定时器 t2,如果 t1 任务在 5 秒内完成,定时器 t2 会被取消,防止超时处理函数 timeout_fun() 被触发。

任务线程启动:t1 开始执行 task(),模拟长时间运行的任务。

定时器启动:t2 启动并在 5 秒后触发超时处理。

任务超时处理:如果任务 t1 在 5 秒内没有完成,定时器 t2 会触发并执行 timeout_fun(),打印 “任务超时”。

任务完成:如果任务在 5 秒内完成,t2.cancel() 会被调用,取消超时处理。

3. concurrent.futures 模块

适用场景:适合处理多个并发任务的超时控制,尤其是涉及 I/O 密集型任务时。该模块简化了线程池和进程池的管理,适合有多个任务并发执行的情况。

优点:

缺点:

适用场景:

当有多个并发任务需要执行,并且每个任务的超时时间需要控制时,可以使用此模块。

示例:

import concurrent.futures
import time


def task():
    time.sleep(10)  # 模拟长时间任务
    return "任务完成"


with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task)
    # submit() 方法异步提交任务并返回一个 Future 对象,Future 对象提供了管理任务的接口,比如获取任务结果或设置超时等。
    try:
        result = future.result(timeout=5)
        # 用来获取 task() 函数的执行结果。timeout=5 表示最多等待 5 秒钟,如果在 5 秒内任务没有完成,就会抛出 TimeoutError 异常。
        # 如果 task() 在 5 秒内完成,future.result() 会返回任务的返回值(即 "任务完成"),然后打印出结果。
        print(result)
    except concurrent.futures.TimeoutError:
        print("任务超时")

4. asyncio 模块

适用场景:适合 I/O 密集型任务,特别是需要并发处理的异步任务。适用于需要高并发的场景,如异步网络请求、文件 I/O、数据库操作等。

优点:

缺点:

适用场景:

当任务是 I/O 密集型且需要高并发时(如处理大量并发请求或文件操作),asyncio 是理想的选择。

示例:

import asyncio


async def task():
    await asyncio.sleep(10)
    # 模拟一个长时间运行的异步任务,asyncio.sleep(10) 会让当前任务挂起 10 秒,在这段时间内不会占用 CPU 资源。类似于 time.sleep(),但它是非阻塞的,因此可以在等待时继续执行其他任务。
    return "任务完成"


async def main():
    try:
        result = await asyncio.wait_for(task(), timeout=5)
        # asyncio.wait_for() 用来执行 task() 并设置超时。如果 task() 在指定的时间内(这里是 5 秒)完成,wait_for 会返回 task() 的结果(即 "任务完成")。
        print(result)
    except asyncio.TimeoutError:
        print("任务超时")


asyncio.run(main())
# 用来启动事件循环并执行 main() 协程。asyncio.run() 会创建一个新的事件循环,运行传入的协程,直到协程执行完毕并返回结果。这个方法会自动管理事件循环的创建和关闭。

5. eventlet

适用场景:适用于需要处理高并发的 I/O 密集型任务,特别是在需要大量并发请求时。适合用来构建高并发的 Web 服务器或网络爬虫。

优点:

缺点:

适用场景:

高并发 I/O 密集型任务,尤其是需要在同一线程中处理大量并发请求的场景。

示例:

import eventlet


def long_task():
    eventlet.sleep(10)
    # 通过 eventlet.sleep(10) 模拟一个阻塞的操作,实际上是让出当前绿色线程 10 秒钟。
    # 注意:eventlet.sleep() 并不是真的让程序进入休眠,它让出控制权,使得其他绿色线程可以执行。这是 eventlet 模拟并发的方式。
    return "任务完成"


def task_with_timeout():
    pool = eventlet.GreenPool()  # 创建绿色线程池
    result = pool.spawn(long_task)
    # spawn() 方法会异步地执行 long_task,并返回一个 GreenThread 对象。这个对象表示正在运行的绿色线程。
    try:
        # wait() 方法会阻塞当前线程,直到绿色线程执行完毕并返回结果。它的 timeout 参数用于设置最大等待时间,这里设置为 5 秒。如果绿色线程在 5 秒内没有完成,wait() 方法会抛出 eventlet.timeout.Timeout 异常。
        return result.wait(timeout=5)  # 设置超时时间为5秒
    except eventlet.timeout.Timeout:
        return "任务超时"


print(task_with_timeout())

6. func-timeout

适用场景:当你需要限制某个单独函数的执行时间,尤其是任务本身可能是阻塞型的。

优点:

缺点:

适用场景:

当你只需要对某个特定函数进行超时控制时,func-timeout 非常方便。

示例:

from func_timeout import func_timeout, FunctionTimedOut


def long_task():
    import time
    time.sleep(10)  # 模拟长时间任务
    return "任务完成"


try:
    result = func_timeout(5, long_task)  # 设置超时时间为5秒
    print(result)
except FunctionTimedOut:
    print("任务超时")

总结对比:

方法适用场景优点缺点
signal单一阻塞任务的超时简单直接,适用于 UNIX-like 系统不支持多线程,无法处理并发任务,仅适用于 UNIX-like 系统
threading多线程任务并发执行灵活支持多线程,适合独立任务线程管理复杂,开销较大,可能出现资源竞争
concurrent.futures多并发任务并行执行高层次接口,线程池/进程池管理简洁,支持超时控制对任务较少时可能过于复杂,线程和进程开销较大
asyncio异步 I/O 密集型任务高效处理 I/O 密集型任务,适合大量并发异步编程模型有学习曲线,不适合 CPU 密集型任务
Eventlet高并发 I/O 密集型任务高效的绿色线程管理,低资源消耗不支持 CPU 密集型任务,额外安装依赖
func-timeout单独函数的超时控制极简接口,快速实现超时控制仅适用于单个函数,无法处理并发任务

到此这篇关于Python中六大超时处理方法介绍的文章就介绍到这了,更多相关Python超时处理方法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文