python使用期物处理并发教程
作者:Michael阿明
learning from 《流畅的python》
1. futures.ThreadPoolExecutor
import os import time import sys import requests POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = './' def save_flag(img, filename): # 保存图像 path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): # 获取图像 url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def show(text): # 打印信息 print(text, end=' ') sys.stdout.flush() def download_many(cc_list): for cc in sorted(cc_list): image = get_flag(cc) # 获取 show(cc) # 打印 save_flag(image, cc.lower() + '.gif') # 保存 return len(cc_list) def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags downloaded in {:.2f}s' print(msg.format(count, elapsed)) # 计时信息 # ----使用 futures.ThreadPoolExecutor 类实现多线程下载 from concurrent import futures MAX_WORKERS = 20 # 最多使用几个线程 def download_one(cc): image = get_flag(cc) show(cc) save_flag(image, cc.lower() + '.gif') return cc def download_many_1(cc_list): workers = min(MAX_WORKERS, len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: # 使用工作的线程数实例化 ThreadPoolExecutor 类; # executor.__exit__ 方法会调用 executor.shutdown(wait=True) 方法, # 它会在所有线程都执行完毕 前阻塞线程 res = executor.map(download_one, sorted(cc_list)) # download_one 函数 会在多个线程中并发调用; # map 方法返回一个生成器,因此可以迭代, 获取各个函数返回的值 return len(list(res)) if __name__ == '__main__': # main(download_many) # 24 秒 main(download_many_1) # 3 秒
2. 期物
通常不应自己创建期物
只能由并发框架(concurrent.futures 或 asyncio)实例化 原因:期物 表示终将发生的事情,其 执行的时间 已经排定。因此,只有排定把某件事交给 concurrent.futures.Executor 子类处理时,才会创建 concurrent.futures.Future 实例
例如,Executor.submit() 方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象 排期,并返回一个期物
def download_many_2(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=3) as executor: to_do = [] for cc in sorted(cc_list): future = executor.submit(download_one, cc) # executor.submit 方法排定可调用对象的执行时间, # 然后返回一个 期物,表示这个待执行的操作 to_do.append(future) # 存储各个期物 msg = 'Scheduled for {}: {}' print(msg.format(cc, future)) results = [] for future in futures.as_completed(to_do): # as_completed 函数在期物运行结束后产出期物 res = future.result() # 获取期物的结果 msg = '{} result: {!r}' print(msg.format(future, res)) results.append(res) return len(results)
输出: Scheduled for BR: <Future at 0x22da99d2d30 state=running> Scheduled for CN: <Future at 0x22da99e1040 state=running> Scheduled for ID: <Future at 0x22da99e1b20 state=running> Scheduled for IN: <Future at 0x22da99ec520 state=pending> Scheduled for US: <Future at 0x22da99ecd00 state=pending> CN <Future at 0x22da99e1040 state=finished returned str> result: 'CN' BR <Future at 0x22da99d2d30 state=finished returned str> result: 'BR' ID <Future at 0x22da99e1b20 state=finished returned str> result: 'ID' IN <Future at 0x22da99ec520 state=finished returned str> result: 'IN' US <Future at 0x22da99ecd00 state=finished returned str> result: 'US' 5 flags downloaded in 3.20s
3. 阻塞型I/O和GIL
CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL), 一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程 通常不能同时使用多个 CPU 核心
标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时 都会释放 GIL。 这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程(网络下载,文件读写都属于 IO 密集型)
4. 使用concurrent.futures模块启动进程
这个模块实现的是真正 的并行计算,因为它使用 ProcessPoolExecutor 类把工作分配给多个 Python 进程处理。 因此,如果需要做 CPU 密集型处理,使用这个模块 能绕开 GIL,利用所有可用的 CPU 核心
使用 concurrent.futures 模块能特别轻松地 把 基于线程 的方案转成 基于进程 的方案
ProcessPoolExecutor 的价值体现在 CPU 密集型 作业上
以上就是python使用期物处理并发教程的详细内容,更多关于python期物处理并发的资料请关注脚本之家其它相关文章!