Python使用future处理并发问题方案详解
作者:lijiachang8
从Python3.2引入的concurrent.futures模块,Python2.5以上需要在pypi中安装futures包。
future指一种对象,表示异步执行的操作。这个概念的作用很大,是concurrent.futures模块和asyncio包的基础。
网络下载的三种风格
为了高效的处理网络IO,需要使用并发,因为网络有很高的延迟,所以为了不浪费CPU周期去等待,最好再收到网络响应之前去做其他的事情。
下面有三种示例程序,
第一个程序是依序下载的,第二个是使用theadpool来自concurrent.futures模块,第三个是使用asyncio包
按照顺序下载
下面示例是依次下载
import os import time import sys import requests POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/' def sava_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def show(text): print(text, end=" ") sys.stdout.flush() # 能在一行中显示 def download_many(cc_list): for cc in sorted(cc_list): image = get_flag(cc) show(cc) sava_flag(image, cc.lower() + ".gif") return len(cc_list) def main(): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags download in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main()
打印
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags download in 24.16s
知识点:
- 按照惯例,requests不在标准库中,在导入标准库之后,用一个空行分隔开
- sys.stdout.flush() : 显示一个字符串,然后刷新sys.stdout,这样能在一行消息中看到进度。Python中正常情况下,遇到换行才会刷新stdout缓冲。
使用conrurrent.futures模块多线程下载
concurrent.futures模块的主要特色是TheadPoolExecutor和ProcessPoolExecutor类,这两个类实现的结构能分别在不同线程或者进程中执行可调用对象。
这两个类内部维护着一个工作线程池或者进程池,以及要执行的任务队列。不过,这个接口抽象的层级很高,无需关心任何实现细节。
下面展示如何使用TheadPoolExecutor.map方法,最简单的方式实现并发下载。
import os import time import sys from concurrent import futures import requests POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/' max_workers = 20 # 设定线程数 def sava_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def show(text): print(text, end=" ") sys.stdout.flush() # 能在一行中显示 def download_one(cc): image = get_flag(cc) show(cc) sava_flag(image, cc.lower() + ".gif") def download_many(cc_list): works = min(len(cc_list), max_workers) # 取其中的最小值,以免创建多余的线程 with futures.ThreadPoolExecutor(works) as executor: res = executor.map(download_one, cc_list) return len(list(res)) def main(): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags download in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main()
打印
FR IN RU ID BD JP CN VN TR CD PH NG DE ET US EG IR BR MX PK
20 flags download in 2.92s
知识点:
- 使用线程数实例化ThreadPoolExecute类。executor.__exit__方法会调用executor.shutdown(wait=True)方法,它会在所有线程执行完毕前阻塞线程。
- executor.map方法的作用于内置map函数类似,区别是在多个线程中并发调用,此map方法会返回一个生成器,因此可以迭代,获取各个函数返回的值。
- return len(list(res))这里要注意下,读取res也就是executor.map各个函数的返回值,如果线程中有异常,会在这里抛出,这与隐式调用next()函数从迭代器中获取相应的返回值一样。
使用asyncio异步下载
后续章节会介绍
future是什么
从Python3.4开始,标准库中有两个名为Future的类:concurrent.futures.Future和asyncio.Future
这两个类的作用相同:两个future类的实例都可以表示已经完成或者尚未完成的延迟计算。这与Twister引擎中的Deferred类、Tornado框架中的Future类,以及多个JavaScript库中的Promise对象类似。
future封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)。
通常情况下自己不应该创建future,而只能由并发框架concurrent.futures和asyncio实例化。
原因很简单:future表示终将发生的事情,而确定某件时间发生的唯一方式是执行的时间(顺序)已经排定。因此,只有排定把某件事情交给concurrent.futures.Executor子类处理时,才会创建concurrent.futures.Future实例。
Executor.submit()方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个future。
客户端代码不应该改变future的状态,并发框架在future表示的延迟计算结束后会改变future的状态,而我们无法控制计算何时结束。
两种future都有.done()方法,这个方法并不阻塞,返回值是布尔值,指明future链接的可调用对象是否已经执行。
客户端代码通常不会询问future是否运行结束,而是会等待通知。因
两个Future类都有.add_done_callback()方法:这个方法只有一个参数,类型是可调用对象,future运行结束后会调用此可调用对象。
还有.result()方法,如果在future运行结束后调用的haunt,这个方法在两个Future类的作用相同:返回可调用对象的结果,或者抛出异常(重新抛出执行可调用对象时抛出的异常)。可是,如果future没有运行结束,result方法在两个Future类中的行为相差很大:
对于concurrent.futures.Future实例来说,调用了f.result()方法会阻塞调用方所在的线程,直到有结果返回,此时的result方法,可以接受可选的timeout参数,如果在指定实现内future没有运行完毕,会抛出TimeoutError异常。
对于asyncio.Future.result方法,不支持设置timeout超时时间,在那个库中获取future结果最好使用yield from结构。
这两个库中有几个函数会返回future,其他函数则使用future以用户易于理解的方式实现自身。
Executor.map方法是使用future:返回值是一个迭代器,迭代器的__next__方法调用各个future的result方法,因此得到各个future的结果。
concurrent.futures.as_completed函数参数是一个列表,返回值是一个迭代器,在future运行结束后产出future。
为了理解future,使用as_completed函数,把较为抽象的executor.map调换成两个for循环:一个用户创建并排定future(使用summit方法),一个用于获取future的结果。
示例,一窥神秘的future。
...其余代码省略 def download_one(cc): image = get_flag(cc) show(cc) sava_flag(image, cc.lower() + ".gif") return cc def download_many(cc_list): cc_list = cc_list[:5] # 这次演示5个国家 future_list = [] with futures.ThreadPoolExecutor(max_workers=3) as executor: # 线程池为3,便于观察 for cc in sorted(cc_list): future = executor.submit(download_one, cc) # submit方法排定可调用对象的执行时间,然后返回一个future,表示待执行操作 future_list.append(future) print("{}: {}".format(cc, future)) res = [] for future in futures.as_completed(future_list): # as_completed函数在future都执行完毕后,返回future result = future.result() # 获取future结果 print('{} result: {!r}'.format(future, result)) res.append(result)
打印
BR: <Future at 0x3af2870 state=running>
CN: <Future at 0x3af2c90 state=running>
ID: <Future at 0x3af2ff0 state=running>
IN: <Future at 0x3aff3b0 state=pending>
US: <Future at 0x3aff410 state=pending>
CN <Future at 0x3af2c90 state=finished returned str> result: 'CN'
ID <Future at 0x3af2ff0 state=finished returned str> result: 'ID'
BR <Future at 0x3af2870 state=finished returned str> result: 'BR'
IN <Future at 0x3aff3b0 state=finished returned str> result: 'IN'
US <Future at 0x3aff410 state=finished returned str> result: 'US'
知识点:
- summit方法把一个可执行对象(函数),变为一个future对象,并且记录了执行时间,表示待执行打操作。
- futures.as_completed在所有的future执行完毕后,产出future对象。而后可以使用future.result()获取结果
- 直接打印future对象(调用future的repr()方法),会显示future的状态,例如running、pending(等待)、finished
GIL和阻塞型I/O
严格来说,上面实现的多线程并发脚本都不能实现并行下载。
使用concurrent.futures库实现的示例,都会受到GIL(Global Interpreter Lock,全局解释器锁)的限制,脚本只能在单个线程中执行。
CPython解释器本身就不是线程安全的,因此会有全局解释器锁GIL,一次只允许使用一个线程执行Python字节码。
因此一个Python进程通常不能同时使用多个CPU核心。(在Jython和IronPython中没有此限制,目前最快的PyPy解释器也存在GIL) IronPython是.net实现的
Python代码无法控制GIL,然而,标准库中所以执行阻塞型IO操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在Python语言这个层次上可以使用多线程,而IO密集型操作能从中受益:一个Python线程等待网络响应时,阻塞型IO函数会释放GIL,再运行一个线程。比如time.sleep()函数也会释放GIL。
GIL简化了CPython和C语言扩展的实现。得益于GIL,Python有很多C语言扩展。
使用concurrent.futures模块多进程
在处理CPU密集型操作时,可以使用多进程,实现真正的并行计算。
使用ProcessPoolExecutor类把任务分配给多个Python进程处理。因此如果需要做CPU密集型操作,使用这个模块多进程能绕开GIL,利用所有可用的CPU核心。
ProcessPoolExecutor和ThreadPoolExecutor类都实现了通用的Executor接口,因此使用concurrent.futures模块能轻松的把基于线程的方案转成基于进程的方案。
这两个实现Executor接口的类,唯一的区别是,ThreadPoolExecutor.__init__方法需要max_workers参数,指定线程池中线程的数量。
但是在ProcessPoolExecutor类中,这个参数是可选的,而且大多数情况下使用默认值:os.cpu_count()函数返回的CPU数量。因为对于CPU密集型操作来说,不可能要求使用超过CPU数量的进程。
经过测试,使用ProcessPoolExecutor实例下载20个国旗的时间,要比ThreadPoolExecutor要慢,主要原因是我电脑是四核八线程,八个逻辑处理器,因此限制只有4个并发下载,而使用线程池的版本有20个工作线程。
ProcessPoolExecutor的价值体现在CPU密集型操作上。比如对于加密算法上,使用ProcessPoolExecutor类派生出四个工作进程后,性能可以提高两倍。
如果使用PyPy比CPython相比,速度又能提高3.8倍。所以使用Python进行CPU密集型操作,应该试试PyPy,普遍快3.8~5.1倍。
实验Executor.map方法
若想并发运行多个可调用对象,最简单是是使用Executor.map方法。
示例,演示Executor.map方法的某些运作细节
import time from concurrent import futures def display(*args): """把参数打印前,加上时间显示""" print(time.strftime('[%H:%M:%S]'), end=" ") print(*args) def loiter(n): """开始时显示一个消息,然后休眠n秒,最后再结束的时候在显示一个消息 消息使用制表符缩进,缩进量由n值确定 loiter:徘徊,闲着,闲荡 """ msg = '{}loiter({}):doing nothing for {}s' display(msg.format('\t' * n, n, n)) time.sleep(n) msg = '{}loiter({}):done' display(msg.format('\t' * n, n)) return n * 10 # 随意返回一个结果 def main(): display('Script starting.') # 脚本开始 executor = futures.ThreadPoolExecutor(max_workers=3) results = executor.map(loiter, range(5)) # 把5个任务交个3个线程 display('results :', results) # 打印调用executor.map的结果,是一个生成器 display('waiting for individual results:') # 等待个体结果 for i, result in enumerate(results): display('result {}: {}'.format(i, result)) if __name__ == '__main__': main()
打印
[17:40:08] Script starting.
[17:40:08] loiter(0):doing nothing for 0s
[17:40:08] loiter(0):done
[17:40:08] loiter(1):doing nothing for 1s
[17:40:08] loiter(2):doing nothing for 2s
[17:40:08][17:40:08] results : loiter(3):doing nothing for 3s
<generator object Executor.map.<locals>.result_iterator at 0x0318CDB0>
[17:40:08] waiting for individual results:
[17:40:08] result 0: 0
[17:40:09] loiter(1):done
[17:40:09] loiter(4):doing nothing for 4s
[17:40:09] result 1: 10
[17:40:10] loiter(2):done
[17:40:10] result 2: 20
[17:40:11] loiter(3):done
[17:40:11] result 3: 30
[17:40:13] loiter(4):done
[17:40:13] result 4: 40
知识点:
- 示例中把5个任务交给executor(3个线程),其中前三个任务会立即开始;这是非阻塞调用。
- 在for循环中会隐式调用next(results),这个函数又会在第一个任务的future上调用future.result()方法。result方法会阻塞,直到这个future运行结束。所以这个for循环每次迭代都会阻塞,等到结果出来后,才会继续。
- 每次的打印结果都可能不一样。由于sleep函数总会释放GIL,即使是sleep(0),所以loiter(1)有可能在loiter(0)结束之前开始运行,但是这个示例中没有。三个线程是同时开始。
- executor.map的结果是一个生成器,这个操作不会阻塞。
- loiter(0)的结果result 0: 0打印没有阻塞的原因是,在for循环之前future已经执行完成,可以看到输出了done。
综上,Executor.map函数易于使用,有个特征算是优点,但也可能没用变成缺点,具体情况取决于需求:map函数返回的结果顺序于调用开始的顺序一致。
如果第一个任务生成结果用时10秒,而其他任务调用只用1秒,代码就会阻塞10秒,获取map方法返回生成器的第一个结果。在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束,所以,获取循环所有结果,阻塞的用时等于最长的任务时间。
如果必须等待获取所有结果后再处理的场景,这种行为没问题;不过,通常更常用的方式是,不管提交的顺序,只有有结果就获取。这样就要使用executor.submit方法和futures.as_completed函数结合起来使用。
executor.submit和futures.as_completed这个组合比executor.map更灵活:
- 因为submit方法能处理不同的可调用对象和参数,而executor.map只能处理参数不同的同一个可调用对象
- 此外,传给future.as_completed函数的future集合可以来自多个Executor实例
- futures.as_completed只返回已经运行结束的future
显示下载进度条
Python内置库有tqdm包,taqadum在阿拉伯语中的意思是进展。
tqdm可以在长循环中添加一个进度提示信息,用户只需要封装任意的迭代器 tqdm(iterator)
import time from tqdm import tqdm for i in tqdm(range(1000)): time.sleep(.01) 100%|██████████| 1000/1000 [00:10<00:00, 95.34it/s]
tqdm能处理任何可迭代对象,生成一个迭代器;使用这个迭代器时,显示进度条和完成全部迭代
为了计算剩余时间,tqdm函数要获取可以使用len函数的可迭代对象,或者在第二个参数中指定预期的元素数量。
例如,futures.as_completed函数的结果,就不支持len函数,只能使用tdqm的第二个参数total=来指定数量。
网络下载增加错误处理和进度条
下面的示例中负责下载一个文件的函数(download_one)中使用相同的策略处理HTTP 404错误。其他异常则向上冒泡,交给download_many函数处理。
示例,负责下载的基本函数。
def get_flag(base_url, cc): url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) resp = requests.get(url) if resp.status_code != 200: resp.raise_for_status() # 如果状态码不是200,产生一个HttpError的异常 return resp.content def download_one(cc, base_url, verbose=False): try: image = get_flag(base_url, cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' else: # 如果不是404异常,向上冒泡,传给调用方 raise else: sava_flag(image, cc.lower() + '.gif') status = HTTPStatus.ok msg = 'ok' if verbose: print(cc, msg) return Result(status, cc)
示例,依序下载的download_many函数
def download_many(cc_list, base_url, verbose, max_req): """实现依序下载""" counter = collections.Counter() # 统计不同的下载状态:HTTPStatus.ok、HTTPStatus.not_found、HTTPStatus.error cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm.tqdm(cc_iter) # 如果不需要详细模式,就使用进度条展示 for cc in cc_iter: try: res = download_one(cc, base_url, verbose) except requests.exceptions.HTTPError as exc: error_msg = "HTTP error {res.status_code} - {res.reason}" error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: print('*** Error for {}:{}'.format(cc, error_msg)) return counter
知识点:
requests.exceptions中有所有的requests相关的异常类,可以用来捕获相关异常。
如果有响应信息后,产生的异常,异常对象exc.response的status_code状态码和reason异常原因
示例,多线程下载的download_many函数
default_concur_req = 30 # 默认的线程池大小 max_concur_req = 1000 # 最大并发请求数,这是一个安全措施 def download_many(cc_list, base_url, verbose, concur_req): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: to_do_map = {} for cc in sorted(cc_list): future = executor.submit(download_one, cc, base_url, verbose) # submit排定一个可调用对象的执行时间,返回一个Future实例 to_do_map[future] = cc # 把各个Future实例映射到国家代码上,在错误处理时使用 done_iter = futures.as_completed(to_do_map) # 返回一个迭代器,在future运行结束后产出future if not verbose: done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # 如果不是详细模式,就显示进度条,因为done_iter没有len函数,只能通过total参数传入 for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP {res.status_code} - {res.reason}' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = "Connection error" else: error_msg = "" status = res.status if error_msg: status = HTTPStatus.error counter[status] +=1 if verbose and error_msg: cc = to_do_map[future] print('*** Error for {}:{}'.format(cc, error_msg)) return counter
知识点:
对futures.as_completed函数的惯用法:构建一个字典,把各个future映射到其他数据上,future运行结束后可能会有用。比如上述示例,把future映射到国家代码上。
线程和多进程的代替方案
对于多线程,如果futures.ThreadPoolExecutor类对某个作业来首不够灵活,可能要使用到threading模块中的组件(如Thread、Lock、Semaphore等)自行制定方案,
比如使用queue模块创建线程安全的队列,在线程之间传递数据。futures.ThreadPoolExecutor类已经封装好了这些组件。
对于CPU密集型工作来说,要启动多个进程,规避GIL。创建多个进程的最简单方式是用futures.ProcessPoolExecutor类。如果使用场景较复杂,需要更高级的工具,multiprocessing模块的API和threading模块相仿,不过作业交给多个进程处理。
到此这篇关于Python使用future处理并发问题方案详解的文章就介绍到这了,更多相关Python future处理并发内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!