python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python多线程

Python多线程与同步机制浅析

作者:alwaysrun

线程(Thread)是操作系统能够进行运算调度的最小单位;线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源

线程实现

Python中线程有两种方式:函数或者用类来包装线程对象。threading模块中包含了丰富的多线程支持功能:

Thread类

通过Thread类来处理线程,类中提供的一些方法:

函数方式

通过Thread直接构造线程,然后通过start方法启动线程:

threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)

各参数说明:

def simpleRoutine(name, delay):
    print(f"routine {name} starting...")
    time.sleep(delay)
    print(f"routine {name} finished")
if __name__ == '__main__':
    thrOne = threading.Thread(target=simpleRoutine, args=("First", 1))
    thrTwo = threading.Thread(target=simpleRoutine, args=("Two", 2))
    thrOne.start()
    thrTwo.start()
    thrOne.join()
    thrTwo.join()

继承方式

直接继承Thread,创建一个新的子类(主要实现run方法):

class SimpleThread (threading.Thread):
    def __init__(self, name, delay):
        # threading.Thread.__init__(self)
        super().__init__()
        self.name = name
        self.delay = delay
    def run(self):
        print(f"thread {self.name} starting...")
        time.sleep(self.delay)
        print(f"thread {self.name} finished")
if __name__ == '__main__':
    thrOne = SimpleThread("First", 2)
    thrTwo = SimpleThread("Second", 2)
    thrOne.start()
    thrTwo.start()
    thrOne.join()
    thrTwo.join()

同步机制

当多个线程同时修改同一条数据时可能会出现脏数据;所以,就需要线程锁,即同一时刻只允许一个线程执行操作。

同步锁Lock

threading提供了Lock和RLock(可重入锁)两个类,它们都提供了如下两个方法来加锁和释放锁:

两种使用锁的方式:

gCount = 0
def PlusOne(locker):
    global gCount
      with locker:
          gCount += 1、
def MinusOne(locker):
    global gCount
      if locker.acquire():
          gCount -= 1
          locker.release()

条件变量Condition

Condition对象内部维护了一个锁(构造时可传递一个Lock/RLock对象,否则内部会自行创建一个RLock)和一个waiting池:

Condition对象:

__init__(self,lock=None):Condition类总是与一个锁相关联(若不指定lock参数,会自动创建一个与之绑定的RLock对象);

acquire(timeout):调用关联锁的acquire()方法;

release():调用关联锁的release()方法

wait(timeout):线程挂起,直到收到一个notify通知或超时才会被唤醒;必须在已获得锁的前提下调用;

notify(n=1):唤醒waiting池中的n个正在等待的线程并通知它:

notify_all():通知所有线程。

class Producer(threading.Thread):
    def __init__(self, cond, storage):
        threading.Thread.__init__(self)
        self.cond = cond
        self.storage = storage
    def run(self):
        label = 1
        while True:
            with self.cond:
                if len(self.storage) < 10:
                    self.storage.append(label)
                    print(f"<- Produce {label} product")
                    label += 1
                    self.cond.notify(2)
                else:
                    print(f"<- storage full: Has Produced {label - 1} product")
                    self.cond.notify_all()
                    self.cond.wait()
                time.sleep(0.4)
class Consumer(threading.Thread):
    def __init__(self, name, cond, storage):
        threading.Thread.__init__(self)
        self.name = name
        self.cond = cond
        self.storage = storage
    def run(self):
        while True:
            if self.cond.acquire():
                if len(self.storage) > 1:
                    pro = self.storage.pop(0)
                    print(f"-> {self.name} consumed {pro}")
                    self.cond.notify()
                else:
                    print(f"-> {self.name} storage empty: no product to consume")
                    self.cond.wait()
                self.cond.release()
                time.sleep(1)

信号量Semaphore

信号量对象内部维护一个计数器:

threading中有Semaphore和BoundedSemaphore两个信号量;BoundedSemaphore限制了release的次数,任何时候计数器的值,都不不能大于初始值(release时会检测计数器的值,若大于等于初始值,则抛出ValueError异常)。

通过Semaphore维护生产(release一个)、消费(acquire一个)量:

# products = threading.Semaphore(0)
def produceOne(label, sem: threading.Semaphore):
    sem.release()
    print(f"{label} produce one")
def consumeOne(label, sem: threading.Semaphore):
    sem.acquire()
    print(f"{label} consume one")

通过BoundedSemaphore来控制并发数量(最多有Semaphore初始值数量的线程并发):

# runner = threading.BoundedSemaphore(3)
def runBound(name, sem: threading.BoundedSemaphore):
    with sem:
        print(f"{name} is running")
        time.sleep(1)
        print(f"{name} finished")

事件Event

事件对象内部有个标志字段,用于线程等待事件的发生:

多线程等待事件发生,然后开始执行:

def waiters(name, evt: threading.Event):
    evt.wait()
    print(f"{name} is running")
    time.sleep(1)
    print(f"{name} finished")
def starting(evt: threading.Event):
    evt.set()
    print("event is set")

屏障Barrier

屏障用于设定等待线程数量,当数量达到指定值时,开始执行:

threading.Barrier(parties, action=None, timeout=None)

屏障属性与方法:

def waitBarrier(name, barr: threading.Barrier):
    print(f"{name} waiting for open")
    try:
        barr.wait()
        print(f"{name} running")
        time.sleep(5)
    except threading.BrokenBarrierError:
        print(f"{name} exception")
    print(f"{name} finished")

GIL全局解释器锁

GIL(Global Interpreter Lock,全局解释器锁);cpython中,某个线程想要执行,必须先拿到GIL(可以把GIL看作是“通行证”)。每次释放GIL锁,线程都要进行锁竞争,切换线程,会消耗资源。

由于GIL锁的存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程),这就是为什么在多核CPU上,python的多线程效率并不高:

python在使用多线程的时候,调用的是c语言的原生线程:

到此这篇关于Python多线程与同步机制浅析的文章就介绍到这了,更多相关Python多线程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文