Python中的线程操作模块(oncurrent)
作者:springsnow
进程是cpu资源分配的最小单元,一个进程中可以有多个线程。
线程是cpu计算的最小单元。
对于Python来说他的进程和线程和其他语言有差异,是有GIL锁。
GIL锁
GIL锁保证一个进程中同一时刻只有一个线程被cpu调度。
GIL锁,全局解释器锁。用于限制一个进程中同一时刻只有一个线程被cpu调度。
扩展:默认GIL锁在执行100个cpu指令(过期时间)。
查看GIL切换的指令个数
import sys v1 = sys。getcheckinterval() print(v1)
一、通过threading.Thread类创建线程
1、 创建线程的方式:直接使用Thread
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('nick',)) t.start() print('主线程')
2、 创建线程的方式:继承Thread
from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('nick') t.start() print('主线程')
二、多线程与多进程
1、 pid的比较
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': # part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主线程/主进程pid',os.getpid()) # part2:开多个进程,每个进程都有不同的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主线程/主进程pid',os.getpid())
2、 开启效率的较量
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': # 在主进程下开启线程 t=thread(target=work) t.start() print('主线程/主进程') ''' 打印结果: hello 主线程/主进程 ''' # 在主进程下开启子进程 t=Process(target=work) t.start() print('主线程/主进程') ''' 打印结果: 主线程/主进程 hello '''
3、 内存数据的共享问题
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('主',n) # 毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100 n=1 t=Thread(target=work) t.start() t.join() print('主',n) # 查看结果为0,因为同一进程内的线程之间共享进程内的数据
三、Thread类的其他方法
Thread实例对象的方法:
isAlive()
:返回线程是否活动的。getName()
:返回线程名。setName()
:设置线程名。
threading模块提供的一些方法:
threading.currentThread()
:返回当前的线程变量。threading.enumerate()
:返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。threading.activeCount()
:返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
1、 代码示例
from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': # 在主进程下开启线程 t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) # 主线程 print(threading.enumerate()) # 连同主线程在内有两个运行的线程 print(threading.active_count()) print('主线程/主进程') ''' 打印结果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主线程/主进程 Thread-1 '''
2、 join方法
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('nick',)) t.start() t.join() print('主线程') print(t.is_alive()) ''' nick say hello 主线程 False '''
四、多线程实现socket
import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
五、守护线程
无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行。
- 对主进程来说,运行完毕指的是主进程代码运行完毕
- 对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
1、 详细解释
主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束。
主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
2、 守护线程例
from threading import Thread import time def foo(): print(123) time.sleep(10) print("end123") def bar(): print(456) time.sleep(10) print("end456") t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon= True #必须在t.start()之前设置 # t1.setDaemon(True) t1.start() t2.start() print("main-------") print(t1.is_alive()) # 123 # 456 # main------- # end456
六、同步锁
1、 多个线程抢占资源的情况
from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果可能为99
2、同步锁的引用
对公共数据的操作
import threading R=threading.Lock() R.acquire() ''' 对公共数据的操作 ''' R.release()
3、实例
不加锁:并发执行,速度快,数据不安全
from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 '''
加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
from threading import current_thread,Thread,Lock import os,time def task(): #未加锁的代码并发运行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加锁的代码串行运行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 '''
七、死锁与递归锁
所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
1、 死锁
from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
解决方法:递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
2、 递归锁(可重入锁)RLock
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。
from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
3、典型问题:科学家吃面
递归锁解决死锁问题
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 抢到了面条'%name) fork_lock.acquire() print('%s 抢到了叉子'%name) print('%s 吃面'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 抢到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 抢到了面条' % name) print('%s 吃面' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','nick','tank']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
八、线程队列
queue队列:使用import queue
,用法与进程Queue一样
当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。
1、先进先出:Queue
通过双向列表实现的
class queue.Queue(maxsize=0)
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(先进先出): first second third '''
2、后进先出:LifoQueue
通过堆实现
class queue.LifoQueue(maxsize=0)
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(后进先出): third second first '''
3、存储数据时可设置优先级的队列:PriorityQueue
PriorityQueue类和LifoQueue类继承Queue类然后重写了_init、_qsize、_put、_get这四个类的私有方法.
通过list来实现的。
class queue.PriorityQueue(maxsize=0)
优先队列的构造函数。maxsize是一个整数,它设置可以放置在队列中的项数的上限。一旦达到此大小,插入将阻塞,直到队列项被使用。如果maxsize小于或等于0,则队列大小为无穷大。
import queue q=queue.PriorityQueue() #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 结果(数字越小优先级越高,优先级高的优先出队): (10, 'b') (20, 'a') (30, 'c') '''
更多方法说明
- __init__(self, maxsize=0) :初始化队列长度,maxsize为0的时候长度为无限
- empty(self) :返回队列是否为空
- full(self) :返回队列是否为满
- qsize(self) :返回队列的大小(并不可靠)
- get(self, block=True, timeout=None) :从队头获取并删除元素,block为true:timeout为None时候,阻塞当前线程直到队列中有可用元素;timeout为非负时候,等了timeout的时间还没有可用元素时候抛出一个Empty异常;block为false:timeout为None时候,队列为空则抛出Empty异常;timeout为非负时候,等待timeout时候后没有可用元素则抛出Empty异常。
- get_nowait(self) :#返回self.get(block=False)
- put(self, item, block=True, timeout=None): 在队尾插入一个元素,block为true:timeout为None时候,阻塞当前线程直到队列中有可用位置;timeout为非负时候,等了timeout时间还没有可用位置时候抛出一个Full异常;block为false:timeout为None时候,队列没有位置则抛出Full异常;timeout为非负时候,等待timeout时候后还是没有可用位置则抛出Full异常。
- put_nowait(self, item) :返回 self.put(item, block=False)
- join(self) :阻塞当前线程直到队列的任务全部完成了
- task_done(self) :通知队列任务的完成情况,当完成时候唤醒被join阻塞的线程
九、Python标准模块——concurrent.futures
官方文档:https://docs.python.org/dev/library/concurrent.futures.html
1、介绍
concurrent.futures模块提供了高度封装的异步调用接口:
- ThreadPoolExecutor:线程池,提供异步调用
- ProcessPoolExecutor:进程池,提供异步调用
两者都实现了由抽象Executor类定义的相同接口。
ThreadPoolExecutor(线程池)与ProcessPoolExecutor(进程池)都是concurrent.futures模块下的,主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值。
通过submit返回的是一个future对象,它是一个未来可期的对象,通过它可以获悉线程的状态。
比较:
- 1、线程不是越多越好,会涉及cpu上下文的切换(会把上一次的记录保存)。
- 2、进程比线程消耗资源,进程相当于一个工厂,工厂里有很多人,里面的人共同享受着福利资源,,一个进程里默认只有一个主线程,比如:开启程序是进程,里面执行的是线程,线程只是一个进程创建多个人同时去工作。
- 3、线程里有GIL全局解锁器:不允许cpu调度
- 4、计算密度型适用于多进程
- 5、线程:线程是计算机中工作的最小单元
- 6、进程:默认有主线程 (帮工作)可以多线程共存
- 7、协程:一个线程,一个进程做多个任务,使用进程中一个线程去做多个任务,微线程
- 8、GIL全局解释器锁:保证同一时刻只有一个线程被cpu调度
2、基本方法
- submit(fn, *args, **kwargs):异步提交任务
- map(func, *iterables, timeout=None, chunksize=1):取代for循环submit的操作
- shutdown(wait=True):相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续 ,
wait=False,立即返回,并不会等待池内的任务执行完毕 ,
但不管wait参数为何值,整个程序都会等到所有任务执行完毕 ,submit和map必须在shutdown之前。 - result(timeout=None):取得结果
- add_done_callback(fn):回调函数
- done():判断某一个线程是否完成
- cancle():取消某个任务
3、ProcessPoolExecutor、ThreadPoolExecutor线程池
ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程中最多能同时运行的线程数目 。
使用submit函数来提交线程需要执行任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。
通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。
使用result()方法可以获取任务的返回值,查看内部代码,发现这个方法是阻塞的。
对于频繁的cpu操作,由于GIL锁的原因,多个线程只能用一个cpu,这时多进程的执行效率要比多线程高。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print( future.result())
4、过wait()判断线程执行的状态:
wait方法可以让主线程阻塞,直到满足设定的要求。
wait(fs, timeout=None, return_when=ALL_COMPLETED),wait接受3个参数,
- s表示执行的task序列;
- timeout表示等待的最长时间,超过这个时间即使线程未执行完成也将返回;
- return_when表示wait返回结果的条件,默认为ALL_COMPLETED全部执行完成再返回
import time from concurrent.futures import ( ThreadPoolExecutor, wait ) def get_thread_time(times): time.sleep(times) return times start = time.time() executor = ThreadPoolExecutor(max_workers=4) task_list = [executor.submit(get_thread_time, times) for times in [1, 2, 3, 4]] i = 1 for task in task_list: print("task{}:{}".format(i, task)) i += 1 print(wait(task_list, timeout=2.5)) # wait在2.5秒后返回线程的状态,result: # task1:<Future at 0x7ff3c885f208 state=running> # task2:<Future at 0x7ff3c885fb00 state=running> # task3:<Future at 0x7ff3c764b2b0 state=running> # task4:<Future at 0x7ff3c764b9b0 state=running> # DoneAndNotDoneFutures( # done={<Future at 0x7ff3c885f208 state=finished returned int>, <Future at 0x7ff3c885fb00 state=finished returned int>}, # not_done={<Future at 0x7ff3c764b2b0 state=running>, <Future at 0x7ff3c764b9b0 state=running>}) # # 可以看到在timeout 2.5时,task1和task2执行完毕,task3和task4仍在执行中
4、map的用法
map(fn, *iterables, timeout=None),第一个参数fn是线程执行的函数;第二个参数接受一个可迭代对象;第三个参数timeout跟wait()的timeout一样,但由于map是返回线程执行的结果,如果timeout小于线程执行时间会抛异常TimeoutError。
map的返回是有序的,它会根据第二个参数的顺序返回执行的结果:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
5、s_completed返回线程执行结果
上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断,有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed方法一次取出所有任务的结果。
import time from collections import OrderedDict from concurrent.futures import ( ThreadPoolExecutor, as_completed ) def get_thread_time(times): time.sleep(times) return times start = time.time() executor = ThreadPoolExecutor(max_workers=4) task_list = [executor.submit(get_thread_time, times) for times in [2, 3, 1, 4]] task_to_time = OrderedDict(zip(["task1", "task2", "task3", "task4"],[2, 3, 1, 4])) task_map = OrderedDict(zip(task_list, ["task1", "task2", "task3", "task4"])) for result in as_completed(task_list): task_name = task_map.get(result) print("{}:{}".format(task_name,task_to_time.get(task_name))) # task3: 1 # task1: 2 # task2: 3 # task4: 4
task1、task2、task3、task4的等待时间分别为2s、3s、1s、4s,通过as_completed返回执行完的线程结果,as_completed(fs, timeout=None)接受2个参数,第一个是执行的线程列表,第二个参数timeout与map的timeout一样,当timeout小于线程执行时间会抛异常TimeoutError。
通过执行结果可以看出,as_completed返回的顺序是线程执行结束的顺序,最先执行结束的线程最早返回。
6、回调函数
Future对象也可以像协程一样,当它设置完成结果时,就可以立即进行回调别的函数。add_done_callback(fn),则表示 Futures 完成后,会调⽤fn函数。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
到此这篇关于Python线程操作模块(oncurrent)的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持脚本之家。