Python并发编程之Futures模块详解
作者:郝同学的测开笔记
前言
Python是一门流行且强大的编程语言,具备灵活的异步编程能力。在并发编程中,Futures模块是Python提供的一个强大工具,它简化了异步编程的复杂性,使得编写并发代码变得更加直观和易于阅读。本文将介绍Futures的基本概念和用法,并通过实例来引导读者深入理解。
区别并发与并行
并发指的是同时处理多个任务的能力。在并发执行中,任务按照交替、间断的方式进行,看起来好像是同时执行。通过在任务之间快速切换,实现了看似同时进行的效果。并发适用于单核处理器系统,其中只有一个物理处理单元。
并行指的是真正同时执行多个任务的能力。在并行执行中,任务可以在多个物理处理单元(例如多核处理器、多个计算机节点等)上同时进行,每个任务都有自己的处理资源。并行适用于多核处理器系统,可以充分利用多个处理单元提高任务的执行效率。
简而言之, 并发指的是任务的交替执行,通过任务之间的快速切换来实现并行的假象;并行则是真正的同时执行多个任务。
以下是并发和并行的一些关键区别:
- 单元数量:并发适用于单个处理单元(如单核CPU),并行适用于多个处理单元(如多核CPU)。
- 实际执行:并发是任务交替执行,看起来像是同时进行;并行是真正同时执行多个任务。
- 物理资源:并发共享物理资源,任务之间快速切换;并行每个任务都有自己的物理资源。
- 提高性能:并行可以通过同时执行多个任务来提高整体性能,而并发主要用于提高响应性和减少等待时间。
并发,是指遇到I/O阻塞时(一般是网络I/O或磁盘I/O),通过多个线程之间切换执行多个任务(多线程)或单线程内多个任务之间切换执行的方式来最大化利用CPU时间,但同一时刻,只允许有一个线程或任务执行。适合I/O阻塞频繁的业务场景。 并行,是指多个进程完全同步同时的执行。适合CPU密集型业务场景。
Futures简介
Futures是Python标准库中concurrent.futures
模块提供的一种并发编程概念。它允许我们在主线程中定义任务,并在后台进行并发执行。通过使用Futures,我们可以异步地执行任务、获取任务的结果,以及处理异常情况,而无需显式地管理线程或进程。
Futures的基本用法
要使用Futures,首先需要导入concurrent.futures
模块:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
接下来,我们需要创建一个线程池或进程池对象,以便执行我们定义的任务。ThreadPoolExecutor
用于开启线程并执行任务,ProcessPoolExecutor
则用于开启进程。
下面是一个使用ThreadPoolExecutor
的示例,计算斐波那契数列的值:
def fibonacci(n): if n <= 1: return n else: return fibonacci(n-1) + fibonacci(n-2) def main(): start_time = time.perf_counter() with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(fibonacci, i) for i in range(10)] results = [future.result() for future in as_completed(futures)] print(results) end_time = time.perf_counter() print(end_time - start_time) # 0.0010214539999999772
在上述示例中,我们使用executor.submit()
方法将任务fibonacci(i)
提交给线程池,返回一个Future对象。通过遍历所有的Future对象,并使用as_completed()
函数等待并获取任务的结果,我们可以得到每个任务的计算结果。
这里我们创建了一个线程池,总共有 2个线程可以分配使用。虽然线程的数量可以自己定义,但是线程数并不是越多越好,因为线程的创建、维护和删除也会有一定的开销。所以如果你设置的很大,反而可能会导致速度变慢。我们往往需要根据实际的需求做一些测试,来寻找最优的线程数量。
多线程与单线程的比较
import time def fibonacci(n): if n <= 1: return n else: return fibonacci(n-1) + fibonacci(n-2) def main(): start_time = time.perf_counter() for i in range(10): fibonacci(i) end_time = time.perf_counter() print(end_time - start_time) # 5.414600000000491
这是单线程的执行时间,单线程的优点是简单明了,但是明显效率低下
异步执行和获取结果
Futures提供了异步执行任务的能力。调用executor.submit()
方法时,任务会立即开始执行,并返回一个Future对象,代表该任务的未来结果。
要获取任务的结果,可以使用Future对象的result()
方法。它会阻塞主线程,直到任务完成并返回结果。另外,还可以使用add_done_callback()
方法注册一个回调函数,当任务完成时自动触发。
下面是一个使用回调函数处理任务结果的示例:
def callback(future): result = future.result() print("Task completed with result:", result) def main(): with ThreadPoolExecutor() as executor: future = executor.submit(fibonacci, 10) future.add_done_callback(callback) # Do other work here # 阻塞主线程,等待任务的完成 futures.wait([future])
在上述示例中,我们通过future.add_done_callback()
方法注册了一个回调函数。当任务完成时,回调函数会被自动调用,并传递Future对象作为参数。我们可以在回调函数中使用future.result()
获取任务的计算结果。
控制并发度
Futures允许我们通过控制并发度来管理并发执行的任务。并发度指的是同时进行的任务数量。
通过传递max_workers
参数给线程池或进程池对象,我们可以限制并发执行的任务数量。如果未指定max_workers
,则默认使用系统可用的核心数。
下面是一个控制并发度的示例,通过设置max_workers
为2,限制了同时执行的任务数量:
def main(): with ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(fibonacci, i) for i in range(10)] results = [future.result() for future in as_completed(futures)] print(results)
在上述示例中,我们使用max_workers=2
创建了一个最大并发度为2的线程池。这意味着最多同时执行两个任务,其余的任务会等待前面的任务完成后再执行。
错误处理
当任务执行过程中出现异常时,Futures提供了异常处理机制,使得我们可以捕获和处理任务中的异常情况。
可以通过在任务函数内部抛出异常,或者使用future.set_exception()
方法手动设置异常,来模拟任务的执行失败。
下面是一个处理任务异常的示例:
def task(): raise Exception("Task execution failed") def main(): with ThreadPoolExecutor() as executor: future = executor.submit(task) try: result = future.result() except Exception as e: print("Task execution failed:", e)
在上述示例中,我们通过在任务函数task()
中抛出异常模拟了任务执行失败的情况。在获取任务结果时,使用try-except
语句捕获异常,并进行处理。
为啥多线程每次只有一个线程执行
同一时刻,Python 主程序只允许有一个线程执行,所以 Python 的并发,是通过多线程的切换完成的。你可能会疑惑这到底是为什么呢?这里我简单提一下全局解释器锁的概念。事实上,Python 的解释器并不是线程安全的,为了解决由此带来的 race condition 等问题,Python 便引入了全局解释器锁,也就是同一时刻,只允许一个线程执行。当然,在执行 I/O 操作时,如果一个线程被 block 了,全局解释器锁便会被释放,从而让另一个线程能够继续执行。
最后
Futures是Python并发编程的一个强大工具,它简化了异步编程的复杂性,使得编写并发代码变得更加直观和易于阅读。本文介绍了Futures的基本用法,包括任务提交和执行、获取结果、控制并发度以及错误处理等方面。
再举个race condition的例子
"Race condition"是并发编程中的一个常见问题,指的是多个进程或线程之间的执行顺序不确定,导致程序的行为变得不可预测。
假设有一个共享的计数器变量 count
,初始值为0。现在有两个线程同时对 count
进行增加操作,代码如下
import threading count = 0 def increment(): global count for _ in range(1000000): count += 1 thread1 = threading.Thread(target=increment) thread2 = threading.Thread(target=increment) thread1.start() thread2.start() thread1.join() thread2.join() print("Final count:", count)
预期结果应该是2000000,即两个线程各加了1000000次,但是实际执行结果并不是这样。这就是Race condition问题。
在这个例子中,当两个线程同时执行 count += 1
操作时,它们会读取当前的 count
值,然后分别进行加一操作,最后再写回 count
。然而,由于线程之间的切换和调度是不确定的,可能在某个线程读取 count
后,还未来得及进行加一操作之前,另一个线程也已经读取了 count
,这样导致了Race condition。
对于 threading,操作系统知道每个线程的所有信息,因此它会做主在适当的时候做线程切换。很显然,这样的好处是代码容易书写,因为程序员不需要做任何切换操作的处理;但是切换线程的操作,容易出现在语句执行过程中,这样就容易出现 race condition 的情况。
而对于 asyncio,主程序想要切换任务时,必须得到此任务可以被切换的通知,这样一来也就可以避免刚刚提到的 race condition 的情况。
到此这篇关于Python并发编程之Futures模块详解的文章就介绍到这了,更多相关Python Futures内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!