python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python操作线程

Python中的线程操作模块(oncurrent)

作者:springsnow

这篇文章介绍了Python中的线程操作模块(oncurrent),文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

进程是cpu资源分配的最小单元,一个进程中可以有多个线程。

线程是cpu计算的最小单元。

对于Python来说他的进程和线程和其他语言有差异,是有GIL锁。

GIL锁

GIL锁保证一个进程中同一时刻只有一个线程被cpu调度。

GIL锁,全局解释器锁。用于限制一个进程中同一时刻只有一个线程被cpu调度。
扩展:默认GIL锁在执行100个cpu指令(过期时间)。
查看GIL切换的指令个数

import sys
v1 = sys。getcheckinterval()
print(v1)

一、通过threading.Thread类创建线程

1、 创建线程的方式:直接使用Thread

from threading import Thread 
import time 

def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('nick',))
    t.start()
    print('主线程')

2、 创建线程的方式:继承Thread

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('nick')
    t.start()
    print('主线程')

二、多线程与多进程

1、 pid的比较

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    # part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主线程/主进程pid',os.getpid())

    # part2:开多个进程,每个进程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主线程/主进程pid',os.getpid())

2、 开启效率的较量

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    # 在主进程下开启线程
    t=thread(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    hello
    主线程/主进程
    '''

    # 在主进程下开启子进程
    t=Process(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    主线程/主进程
    hello
    '''

3、 内存数据的共享问题

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) # 毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('主',n) # 查看结果为0,因为同一进程内的线程之间共享进程内的数据

三、Thread类的其他方法

Thread实例对象的方法:

threading模块提供的一些方法:

1、 代码示例

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    # 在主进程下开启线程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread())
 # 主线程
    print(threading.enumerate())
 # 连同主线程在内有两个运行的线程
    print(threading.active_count())
    print('主线程/主进程')

    '''
    打印结果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主线程/主进程
    Thread-1
    '''

2、 join方法

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('nick',))
    t.start()
    t.join()
    print('主线程')
    print(t.is_alive())
'''
    nick say hello
    主线程
    False
    '''

四、多线程实现socket

import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()

        p=threading.Thread(target=action,args=(conn,))
        p.start()

五、守护线程

无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行。

1、 详细解释

2、 守护线程例

from threading import Thread
import time


def foo():
    print(123)
    time.sleep(10)
    print("end123")


def bar():
    print(456)
    time.sleep(10)
    print("end456")


t1 = Thread(target=foo)
t2 = Thread(target=bar)

t1.daemon= True #必须在t.start()之前设置
# t1.setDaemon(True)

t1.start()
t2.start()
print("main-------")
print(t1.is_alive())
# 123
# 456
# main-------
# end456

六、同步锁

1、 多个线程抢占资源的情况

from threading import Thread
import os,time
def work():
    

global n temp=n 
time.sleep(0.1) 
n=temp-1 

if __name__ == '__main__':
    n=100


    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #结果可能为99

2、同步锁的引用

对公共数据的操作

import threading

R=threading.Lock()
R.acquire()
'''
对公共数据的操作
'''
R.release()

3、实例

不加锁:并发执行,速度快,数据不安全

from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''

加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全

from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加锁的代码并发运行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加锁的代码串行运行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

七、死锁与递归锁

所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

1、 死锁

from threading import Lock as Lock
import time

mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)

mutexA.release()
mutexA.release()

解决方法:递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

2、 递归锁(可重入锁)RLock

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。

from threading import RLock as Lock
import time

mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

3、典型问题:科学家吃面

递归锁解决死锁问题

import time
from threading import Thread,RLock

fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 抢到了面条'%name)
    fork_lock.acquire()
    print('%s 抢到了叉子'%name)
    print('%s 吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 抢到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 抢到了面条' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','nick','tank']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

八、线程队列

queue队列:使用import queue,用法与进程Queue一样

当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。

1、先进先出:Queue

通过双向列表实现的

class queue.Queue(maxsize=0)

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

2、后进先出:LifoQueue

通过堆实现

class queue.LifoQueue(maxsize=0)

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

3、存储数据时可设置优先级的队列:PriorityQueue

PriorityQueue类和LifoQueue类继承Queue类然后重写了_init、_qsize、_put、_get这四个类的私有方法.

通过list来实现的。

class queue.PriorityQueue(maxsize=0)

优先队列的构造函数。maxsize是一个整数,它设置可以放置在队列中的项数的上限。一旦达到此大小,插入将阻塞,直到队列项被使用。如果maxsize小于或等于0,则队列大小为无穷大。

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

更多方法说明

九、Python标准模块——concurrent.futures

官方文档:https://docs.python.org/dev/library/concurrent.futures.html

1、介绍

concurrent.futures模块提供了高度封装的异步调用接口:

两者都实现了由抽象Executor类定义的相同接口。

ThreadPoolExecutor(线程池)与ProcessPoolExecutor(进程池)都是concurrent.futures模块下的,主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值。

通过submit返回的是一个future对象,它是一个未来可期的对象,通过它可以获悉线程的状态。

比较:

2、基本方法

3、ProcessPoolExecutor、ThreadPoolExecutor线程池

ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程中最多能同时运行的线程数目 。

使用submit函数来提交线程需要执行任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。

通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。

使用result()方法可以获取任务的返回值,查看内部代码,发现这个方法是阻塞的。

对于频繁的cpu操作,由于GIL锁的原因,多个线程只能用一个cpu,这时多进程的执行效率要比多线程高。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(
future.result())

4、过wait()判断线程执行的状态:

wait方法可以让主线程阻塞,直到满足设定的要求。

wait(fs, timeout=None, return_when=ALL_COMPLETED),wait接受3个参数,

import time
from concurrent.futures import (
    ThreadPoolExecutor, wait
)


def get_thread_time(times):
    time.sleep(times)
    return times


start = time.time()
executor = ThreadPoolExecutor(max_workers=4)
task_list = [executor.submit(get_thread_time, times) for times in [1, 2, 3, 4]]
i = 1
for task in task_list:
    print("task{}:{}".format(i, task))
    i += 1
print(wait(task_list, timeout=2.5))

# wait在2.5秒后返回线程的状态,result:
# task1:<Future at 0x7ff3c885f208 state=running>
# task2:<Future at 0x7ff3c885fb00 state=running>
# task3:<Future at 0x7ff3c764b2b0 state=running>
# task4:<Future at 0x7ff3c764b9b0 state=running>
# DoneAndNotDoneFutures(
# done={<Future at 0x7ff3c885f208 state=finished returned int>, <Future at 0x7ff3c885fb00 state=finished returned int>},
# not_done={<Future at 0x7ff3c764b2b0 state=running>, <Future at 0x7ff3c764b9b0 state=running>})
#
# 可以看到在timeout 2.5时,task1和task2执行完毕,task3和task4仍在执行中

4、map的用法

map(fn, *iterables, timeout=None),第一个参数fn是线程执行的函数;第二个参数接受一个可迭代对象;第三个参数timeout跟wait()的timeout一样,但由于map是返回线程执行的结果,如果timeout小于线程执行时间会抛异常TimeoutError。

map的返回是有序的,它会根据第二个参数的顺序返回执行的结果:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12))

 #map取代了for+submit

5、s_completed返回线程执行结果

上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断,有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed方法一次取出所有任务的结果。

import time
from collections import OrderedDict
from concurrent.futures import (
    ThreadPoolExecutor, as_completed
)


def get_thread_time(times):
    time.sleep(times)
    return times


start = time.time()
executor = ThreadPoolExecutor(max_workers=4)
task_list = [executor.submit(get_thread_time, times) for times in [2, 3, 1, 4]]
task_to_time = OrderedDict(zip(["task1", "task2", "task3", "task4"],[2, 3, 1, 4]))
task_map = OrderedDict(zip(task_list, ["task1", "task2", "task3", "task4"]))

for result in as_completed(task_list):
    task_name = task_map.get(result)
    print("{}:{}".format(task_name,task_to_time.get(task_name)))

# task3: 1
# task1: 2
# task2: 3
# task4: 4

task1、task2、task3、task4的等待时间分别为2s、3s、1s、4s,通过as_completed返回执行完的线程结果,as_completed(fs, timeout=None)接受2个参数,第一个是执行的线程列表,第二个参数timeout与map的timeout一样,当timeout小于线程执行时间会抛异常TimeoutError。

通过执行结果可以看出,as_completed返回的顺序是线程执行结束的顺序,最先执行结束的线程最早返回。

6、回调函数

Future对象也可以像协程一样,当它设置完成结果时,就可以立即进行回调别的函数。add_done_callback(fn),则表示 Futures 完成后,会调⽤fn函数。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

到此这篇关于Python线程操作模块(oncurrent)的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文