python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python模块multiprocessing & 多进程并发

Python模块multiprocessing & 实现多进程并发方式

作者:一只勤劳的耗子

文章介绍了Python的multiprocessing模块,用于实现多进程编程,涵盖了Process、Queue、Pipe、Pool等主要类及其方法,以及进程管理、进程间通信、进程同步等内容,强调了多进程的优势和应用场景,并提供示例代码帮助理解

简介

multiprocessing模块是Python标准库中提供的一个用于实现多进程编程的模块。它基于进程而不是线程,可以利用多核CPU的优势,提高程序的执行效率,同时也可以实现进程间通信和数据共享。

1. 参数说明

1.1.Process(控制进程)

用于创建子进程对象,必须指定要执行的目标函数。Process(target = [函数名])

语法

multiprocessing.Process(
    target   #必选参数,表示进程所执行的目标函数。
    group    #预留参数,不需要使用。
    name     #指定子进程的名称,通过 [子进程].name 获取
    args     #指定子进程(函数)需要传递的参数,以元组方式传递,例如:args=('A', 'B', 'C')
    kwargs   #指定子进程(函数)需要传递的参数,以字典方式传递,例如:args={'name':'xiaowang', 'age':18}
    daemon   #将子进程设置为守护进程(True|False)。
)

所有方法如下

p = multiprocessing.Process(target = xxx)

    p.run       #表示进程在启动后要执行的方法,需要应用程序开发人员重写。
    p.start     #启动进程。
    p.join      #等待进程终止。
    p.is_alive  #判断进程是否存活。
    p.ident     #获取子进程PID
    p.name      #获取子进程名称
    p.kill      #强制结束子进程。
    p.terminate #强制结束子进程。
    p.close     #关闭与进程关联的所有管道和文件。
    authkey     #返回用于身份验证的进程通信的密钥。

1.2.Queue(进程通信)

用于实现进程间通信的队列,支持多个进程向同一个队列中读写数据。

参数选项

multiprocessing.Queue([maxsize])

所有方法如下

'''向消息队列发送信息'''
Queue.put(item[, block[, timeout]])
    #如果block参数是True,而且队列已满,那么程序就会在队列有空间之前停滞等待。
    #如果block参数是False,并且队列已满,那么就会引发Full异常。
    #如果给出了可选参数timeout,它会阻塞timeout秒。
'''向消息队列发送信息,如果队列已满,会引发Full异常,'''
Queue.put_nowait(item)
    #等同于Queue.put(item, False)。

'''获取队列中元素并删除'''
Queue.get([block[, timeout]])
    #如果队列为空,且block参数为True,那么程序会一直停滞等待,或者阻塞timeout秒。
    #如果block参数是False,而且队列为空,那么就会引发Empty异常。
'''获取队列中元素并删除,如果队列为空则引发Empty异。'''
Queue.get_nowait()

'''关闭进程间通信通道,以防止有些进程被阻塞,从而导致程序死锁或者内存泄露'''
Queue.close()

'''判断队列是否为空'''
Queue.empty()    #为空返回True,不为空返回False。
'''判断队列是否已满'''
Queue.full()     #队列已满返回True,未满返回False。

'''获取队列中的元素个数'''
Queue.qsize()    #注意:此方法不可靠,因为在 Queue.put() 和 Queue.get() 方法的过程中仍然可能发生改变。

1.3.Pipe(管道通信)

创建进程间通信管道,支持两个进程之间的通信。

语法

multiprocessing.Pipe([duplex])

所有方法如下

'''向管道中写入消息'''
Pipe.send('消息')    #如果发送失败会抛出 BrokenPipeError 异常。
'''从管道中读取数据,会阻塞直到有数据可读'''
Pipe.recv():         #如果管道已关闭,则会返回一个 EOFError 异常。

'''关闭管道'''
Pipe.close()

'''返回管道的文件描述符'''
Pipe.fileno()

'''判断读取管道是否阻塞(如果管道可读或者关闭,返回True)'''
Pipe.poll([timeout]) #如果设置了 timeout,则会在指定时间内返回。

'''二进制数据-向管道中写入消息'''
Pipe.send_bytes(buf[, offset[, size]]) #与send一样功能,但是接受的参数为二进制字符串。
'''二进制数据-从管道中读取数据'''
Pipe.recv_bytes([maxlength])           #与recv类似,但是返回的是二进制字符串。

1.4.Pool(进程池)

语法

multiprocessing.Pool(
    processes         #可选参数,指定进程池中的线程数(默认CPU最大数)
    initializer       #可选参数,指定每个工作进程启动时要调用的函数。默认值为 None。
    initargs          #可选参数,指定传递给初始化器函数的参数元组。默认值为 ()。
    maxtasksperchild  #可选参数,限制每个工作进程可以执行的任务数量,然后被终止,以避免内存泄漏问题。默认值为 None,表示进程将一直存在。
)

所有方法如下

Pool.[方法]

'''进程池中同步执行函数, 类似于 func(*args, **kwds) 表达式'''
Pool.apply(func, args=(), kwds={})    #如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。

'''在进程池中异步执行函数'''
apply_async(func, args=(), callback=None)  #支持回调函数。

'''进程池中同步执行函数,将执行结果存储于列表中返回, 类似于 map(func, iterable) 表达式'''
Pool.map(func, iterable, chunksize=None)  #返回一个包含结果的列表。如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。

'''进程池中异步执行函数,并返回生成器对象, 可以逐个获取函数执行结果'''
Pool.imap(func, iterable, chunksize=1)  #这可以在内存有限的情况下处理大型数据集。如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。

'''类似于 imap() 函数,但是结果不保证按照迭代器的顺序产生'''
Pool.imap_unordered(func, iterable, chunksize=1)  #如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。

'''关闭进程池'''
Pool.close()

'''等待所有进程完成'''
Pool.join()

'''强制关闭所有进程'''
Pool.terminate()

1.5.Lock(进程锁)

语法

multiprocessing.Lock(locking = True)
    locking=True:使用底层操作系统提供的锁机制,例如 POSIX 信号量或 Windows 临界区。这是默认值。
    locking=False:会更快地获取和释放锁,但是在一些平台上可能会出现未定义的行为。

所有方法如下

'''获取锁'''
Lock.acquire(block=True, timeout=None)  #如果 block=True,则会阻塞直到获取到锁,否则立即返回。如果 timeout 不是 None,则在超时之前没有获取到锁就会抛出 TimeoutError 异常。如果锁已经被当前进程持有,则会抛出 AssertionError 异常。
'''释放锁'''
Lock.release()  #如果锁没有被当前进程持有,则会抛出 AssertionError 异常。

Lock.locked()            #返回当前锁是否被持有的布尔值。
Lock.notify(n=1)         #唤醒 wait() 方法中等待锁的 n 个线程。
Lock.notify_all()        #唤醒 wait() 方法中等待锁的所有线程。
Lock.wait(timeout=None)  #等待锁。如果锁已经被释放,则会立即返回。如果锁仍然被持有,则会释放锁,并阻塞直到其他线程唤醒该线程或者超时,则会抛出 TimeoutError 异常。

2. 进程管理

2.1. 判断子进程状态

通过is_alive 判断某个子进程是否存活

import multiprocessing
from time import sleep

# 定义一个函数(作为子进程)
def proc():
    print('=========== 子进程开始运行 ===========')
    sleep(3)
    print('=========== 子进程运行结束===========')

if __name__ == '__main__':
    # 将函数proc定义为子进程
    p = multiprocessing.Process(target=proc)

    # 启动子进程
    p.start()

    # 判断子进程是否结束
    while p.is_alive():
        print('[CHECK] 子进程运行中')
        sleep(1)
    else:
        print('子进程已死亡!')

结果

[CHECK] 子进程运行中
=========== 子进程开始运行 ===========
[CHECK] 子进程运行中
[CHECK] 子进程运行中
[CHECK] 子进程运行中
=========== 子进程运行结束===========
子进程已死亡!

2.2. 获取子进程信息

获取子进程名称和PID

import multiprocessing

def proc1():
    '''模仿一个子进程'''
    pass

def proc2():
    '''模仿一个子进程'''
    pass

if __name__ == '__main__':
    # 创建2个子进程
    p1 = multiprocessing.Process(target=proc1, name='【自定义的p1】')
    p2 = multiprocessing.Process(target=proc2)

    # 启动进程1
    p1.start()
    # 启动进程2
    p2.start()

    # 获取子进程的名称和PID
    print(f'进程1的名称为:{p1.name}, PID为:{p1.ident}')
    print(f'进程2的名称为:{p2.name}, PID为:{p2.ident}')

结果

进程1的名称为:【自定义的p1】, PID为:20136
进程2的名称为:Process-2, PID为:15816

2.3. 子进程执行多任务

实现简单的多任务执行

import multiprocessing
import time

def progress1():
    '''模仿一个子进程'''
    for i in range(1,6):
        print(f'[{i}/5] 这是进程1,每隔1s输出一次...')
        time.sleep(1)

def progress2():
    '''模仿一个子进程'''
    for i in range(1,5):
        print(f'[{i}/4] 这是进程2,每隔2s输出一次...')
        time.sleep(2)

if __name__ == '__main__':
    # 创建2个子进程
    p1 = multiprocessing.Process(target=progress1)
    p2 = multiprocessing.Process(target=progress2)

    # 启动进程1
    p1.start()
    # 启动进程2
    p2.start()

    # 程序执行其他事情
    time.sleep(1)
    print('=============== 结束 =============== ')

输出结果(主进程并没有去等待子进程结束,直接做其他事)

[1/5] 这是进程1,每隔1s输出一次...
[1/4] 这是进程2,每隔2s输出一次...
=============== 结束 =============== 
[2/5] 这是进程1,每隔1s输出一次...
[3/5] 这是进程1,每隔1s输出一次...
[2/4] 这是进程2,每隔2s输出一次...
[4/5] 这是进程1,每隔1s输出一次...
[3/4] 这是进程2,每隔2s输出一次...
[5/5] 这是进程1,每隔1s输出一次...
[4/4] 这是进程2,每隔2s输出一次...

使用 join 等待某个子进程结束再运行主进程

import multiprocessing
import time

def progress1():
    '''模仿一个子进程'''
    for i in range(1,6):
        print(f'[{i}/5] 这是进程1,每隔1s输出一次...')
        time.sleep(1)

def progress2():
    '''模仿一个子进程'''
    for i in range(1,5):
        print(f'[{i}/4] 这是进程2,每隔2s输出一次...')
        time.sleep(2)

if __name__ == '__main__':
    # 创建2个子进程
    p1 = multiprocessing.Process(target=progress1)
    p2 = multiprocessing.Process(target=progress2)

    # 启动进程1
    p1.start()
    # 启动进程2
    p2.start()

    # 等待子进程结束
    p1.join()
    p2.join()

    # 程序执行其他事情
    print('=============== 结束 =============== ')

结果

[1/5] 这是进程1,每隔1s输出一次...
[1/4] 这是进程2,每隔2s输出一次...
[2/5] 这是进程1,每隔1s输出一次...
[3/5] 这是进程1,每隔1s输出一次...
[2/4] 这是进程2,每隔2s输出一次...
[4/5] 这是进程1,每隔1s输出一次...
[5/5] 这是进程1,每隔1s输出一次...
[3/4] 这是进程2,每隔2s输出一次...
[4/4] 这是进程2,每隔2s输出一次...
=============== 结束 =============== 

将 进程1(p1) 设置为守护进程(随主进程退出而退出)

import multiprocessing
import time

def progress1():
    '''模仿一个子进程'''
    for i in range(1,6):
        print(f'[{i}/5] 这是进程1,每隔1s输出一次...')
        time.sleep(1)

def progress2():
    '''模仿一个子进程'''
    for i in range(1,5):
        print(f'[{i}/4] 这是进程2,每隔2s输出一次...')
        time.sleep(2)

if __name__ == '__main__':
    # 创建2个子进程
    p1 = multiprocessing.Process(target=progress1, name='【自定义的p1】', daemon=True)
    p2 = multiprocessing.Process(target=progress2)

    # 启动进程1
    p1.start()
    # 启动进程2
    p2.start()

    # 程序执行其他事情
    time.sleep(2)
    print('====================== 结束 ======================')
    print(f'进程1的名称为:{p1.name}, PID为:{p1.ident}')
    print(f'进程2的名称为:{p2.name}, PID为:{p2.ident}')
    print('====================== 结束 ======================')

结果

[1/5] 这是进程1,每隔1s输出一次...
[1/4] 这是进程2,每隔2s输出一次...
[2/5] 这是进程1,每隔1s输出一次...
====================== 结束 ======================
进程1的名称为:【自定义的p1】, PID为:21252
进程2的名称为:Process-2, PID为:21253
====================== 结束 ======================
[2/4] 这是进程2,每隔2s输出一次...
[3/4] 这是进程2,每隔2s输出一次...
[4/4] 这是进程2,每隔2s输出一次...

2.4. 运行多个并发

通过循环的方式去构造多个并发

import multiprocessing
from time import sleep

class MyClass(object):
    def __init__(self, thread_num=1, sleep_proc=None):
        # thread_num表示进程数,sleep_proc表示是否等待退出
        self.thread_num = thread_num
        self.sleep_proc = sleep_proc

    def proc(self):
        # 定义一个并发的子进程
        for i in range(1,4):
            print(f'[{i}/3] 我是一个子进程')
            sleep(1)

    def call_proc(self):
        # 调用子进程
        processes = []
        # 循环调用多个子进程
        for num in range(self.thread_num):
            # 定义子进程属性
            p = multiprocessing.Process(target=MyClass().proc)
            # 启动子进程
            p.start()
            # 将循环的子进程放入列表,用于后面的等待退出
            processes.append(p)

        # 判断是否等待子进程结束
        if self.sleep_proc is True:
            for p in processes:
                p.join()

if __name__ == '__main__':
    # 指定2个并发,并等待子进程结束
    mc = MyClass(2,True)
    mc.call_proc()

    print('=============== 结束 ===============')

结果

[1/3] 我是一个子进程
[1/3] 我是一个子进程
[2/3] 我是一个子进程
[2/3] 我是一个子进程
[3/3] 我是一个子进程
[3/3] 我是一个子进程
=============== 结束 ===============

2.5. 强制杀死子进程

通过 terminate 或者 kill 强制杀死子进程

import multiprocessing
from time import sleep

# 定义2个函数(作为子进程)
def proc1():
    print('=========== 子进程1开始运行 ===========')
    sleep(10)
    print('=========== 子进程1运行结束===========')

def proc2():
    print('=========== 子进程2开始运行 ===========')
    sleep(10)
    print('=========== 子进程2运行结束===========')

if __name__ == '__main__':
    # 将函数定义为子进程
    p1 = multiprocessing.Process(target=proc1)
    p2 = multiprocessing.Process(target=proc2)

    # 启动子进程
    p1.start()
    p2.start()

    # 3秒后手动杀死子进程
    sleep(3)
    p1.kill()       #使用kill杀死子进程
    p2.terminate()  #使用terminate杀死子进程
    print('子进程p1、p2已被强制杀死!')

3. 进程间的通信

进程间通信(IPC,Interprocess communication)是一组编程接口,让程序员能够协调不同的进程,使之能在一个操作系统里同时运行,并相互传递、交换信息。这使得一个程序能够在同一时间里处理许多用户的要求。因为即使只有一个用户发出要求,也可能导致一个操作系统中多个进程的运行,进程之间必须互相通话。IPC接口就提供了这种可能性。

python 的 multiprocessing 模块中,可以使用 Queue、Pipe、Manager 等数据结构实现进程间通信(IPC),也就是进程之间交换数据。进程间通信是多进程编程中非常重要和常见的一部分,通常用于在多个进程之间共享并传递信息、数据或任务结果。

3.1. Queue 进程通信

from multiprocessing import Process, Queue
from time import sleep


def proc1(q):
    '''这是一个发送消息的函数'''
    msgs = ["香蕉", "苹果", "水蜜桃"]
    for msg in msgs:
        # 将迭代对象放入通信队列
        q.put(msg)
        # 打印当前迭代的内容
        print(f"[进程1] 发送信息({msg})")
        sleep(1)
    # 迭代完成后关闭通信
    q.close()


def proc2(q):
    '''这是一个接收消息的函数'''
    while True:
        try:
            # 删除通信队列中的一个元素
            msg = q.get(block=False)
            print(f"[进程2] 收到信息({msg}), 并删除该信息")
        except:
            q.close()   # 通信完成后关闭
            break
        sleep(1)


if __name__ == '__main__':
    # 使用Queue方法通信
    q = Queue()
    # 定义子进程属性,将 Queue 方法传入子进程
    p1 = Process(target=proc1, args=(q,))
    p2 = Process(target=proc2, args=(q,))

    # 启动2个子进程
    p1.start()
    p2.start()

    # 等待2个子进程结束
    p1.join()
    p2.join()

    print("=============== 结束 ===============")

逻辑视图

3.2.Queue控制多个子进程

若消息队列为空,继续运行子进程;若消息队列不为空,停止子进程。由控制子进程决定

from multiprocessing import Process,Queue
from time import sleep


class MyClass(object):
    def __init__(self, time):
        self.time = time    #指定n秒后退出子进程
        self.q = Queue()    #将Queue赋值给公共方法

    def sed_msgs(self):
        '''该方法决定子进程是否退出'''
        sleep(self.time)    #按指定时间休眠
        self.q.put('over')  #休眠后向消息队列发送一条消息
        self.q.close()      #关闭通信

    def proc1(self):
        '''这是一个子进程,当消息队列不为空则停止运行'''
        while self.q.empty():
            print('[进程1] 执行当前任务...')
            sleep(1)
        self.q.close()

    def proc2(self):
        '''这是一个子进程,当消息队列不为空则停止运行'''
        while self.q.empty():
            print('[进程2] 执行当前任务...')
            sleep(1)
        self.q.close()

if __name__ == '__main__':
    mc = MyClass(3)     #给定休眠时间3s

    # 定义子进程
    s1 = Process(target=mc.sed_msgs)
    p1 = Process(target=mc.proc1)
    p2 = Process(target=mc.proc2)
    communication = [s1, p1, p2]

    # 启动所有子进程
    for s in communication:
        s.start()

    # 等待所有子进程结束
    for s in communication:
        s.join()

    print('====================== 结束 ======================')

逻辑视图

3.3. Pipe 管道通信

Pipe()支持多个进程间的管道通信。管道可以被多个进程访问,但是一次只能有一个进程对管道进行操作。

Pipe()赋值给两个对象:p1,p2 =Pipe(),p1是发送消息的方法,p2是接收消息的方法。

定义2个进程,分别向管道中发送消息和接收消息

from multiprocessing import Pipe, Process

class MyClass(object):
    '''定义2个管道和2个进程,相互发送和接收消息'''
    def __init__(self):
        # parent_conn表示发送消息,child_conn表示接收消息
        self.parent_conn1, self.child_conn1 = Pipe()
        self.parent_conn2, self.child_conn2 = Pipe()

    def proc1(self):
        # 向管道1中发送消息
        self.parent_conn1.send('苹果')
        # 接收管道2中的消息
        received_message = self.child_conn2.recv()
        print(f'[进程1] 收到的消息是:{received_message}')
        # 关闭管道1
        self.parent_conn1.close()

    def proc2(self):
        # 接收管道1中的消息
        received_message = self.child_conn1.recv()
        print(f'[进程2] 收到的消息是:{received_message}')
        # 向管道2中发送消息
        self.parent_conn2.send('香蕉')
        # 关闭管道2
        self.parent_conn2.close()

if __name__ == '__main__':
    mc = MyClass()

    # 创建子进程
    p1 = Process(target=mc.proc1)
    p2 = Process(target=mc.proc2)

    # 启动子进程
    p1.start()
    p2.start()

    # 等待子进程结束
    p1.join()
    p2.join()

    print('=================== 结束 ===================')

说明:

进程1向管道1中发送一条消息(苹果),发送完成之后接收管道2消息,如果没有则等待。

进程2接收管道1的消息并输出,再向管道2中发送一条消息(香蕉)。

4. 进程池

4.1. 同步与异步并发区别

同步的简单代码如下

from multiprocessing import Pool
from time import sleep

def proc1():
    '''子进程1'''
    for i in range(2):
        print(f'[进程1] 执行{i}...')
        sleep(1)

def proc2():
    '''子进程2'''
    for i in range(2):
        print(f'[进程2] 执行{i}...')
        sleep(1)

if __name__ == '__main__':
    # 定义进程池
    pool = Pool()
    # 同步执行2个子进程
    pool.apply(proc1)
    pool.apply(proc2)
    # 关闭和等待子进程结束
    pool.close()
    pool.join()

使用pool.apply ([函数名]) 定义同步执行:先执行 proc1,再执行 proc2

使用异步 pool.apply_async ([函数名])(将上述代码 apply 修改为 apply_async 即可)

    # 异步执行2个子进程
    pool.apply_async (proc1)
    pool.apply_async (proc2)

其结果为:proc1 和 proc2 同时执行

4.2. 异步调用多个子进程

进程池异步并发的基本使用方法

'''定义子进程'''
def start_func():
    pass
def proc1():
    pass
def proc2():
    pass

if __name__ == '__main__':
    # 定义进程池,设置属性
    pool = Pool(processes=1, maxtasksperchild=2, initializer=start_func)
    # 异步启动子进程(子进程为指定的某个函数)
    pool.apply_async(proc1)
    pool.apply_async(proc2)
    # 关闭和等待子进程结束
    pool.close()
    pool.join()

进程池中有4个子进程,仅使用2个并发(A、B、C、D,先执行AB,再执行CD)

from multiprocessing import Pool
from time import sleep

def start_func():
    print('启动前此函数')

def proc1():
    '''子进程'''
    for i in range(2):
        print(f'[进程1] 执行{i}...')
        sleep(1)

def proc2():
    '''子进程'''
    for i in range(2):
        print(f'[进程2] 执行{i}...')
        sleep(1)

def proc3():
    '''子进程'''
    for i in range(2):
        print(f'[进程3] 执行{i}...')
        sleep(1)

def proc4():
    '''子进程'''
    for i in range(2):
        print(f'[进程4] 执行{i}...')
        sleep(1)

if __name__ == '__main__':
    # 定义进程池,设置属性
    pool = Pool(processes=2, initializer=start_func)
    func_list = [proc1, proc2, proc3, proc4]
    for i in func_list:
        pool.apply_async(i)
    # 关闭和等待子进程结束
    pool.close()
    pool.join()

结果如下:

如果将processes 设置为 4,那么4个进程将会同时进行

4.3. 进程池的高并发

使用 map(同步) 执行高并发。会自动等待子进程完成后,才会进行执行主进程。

from multiprocessing import Pool

def proc(num):
    print(f'我是子进程')

if __name__ == '__main__':
    # 定义线程池,设置并发数为5
    pool = Pool(5)
    # 使用map向函数传入多个参数(每传入一个参数则会调用一次函数,同时调用的数量由进程数决定)
    pool.map(proc, range(5))
    # 关闭进程池
    pool.close()
    print('========== 结束 ==========')

使用 imap(异步) 执行高并发。调度子进程运行后不会等待,继续执行主进程任务。若主进程运行结束,则子进程不论是否运行完成都将强制结束。

from multiprocessing import Pool

def proc(num):
    print(f'我是子进程')

if __name__ == '__main__':
    # 定义线程池,设置并发数为5
    pool = Pool(5)
    # 使用map向函数传入多个参数(每传入一个参数则会调用一次函数,同时调用的数量由进程数决定)
    pool.imap(proc, range(5))
    # 关闭进程池
    pool.close()
    print('========== 结束 ==========')

这样的好处是不会影响主进程其他任务的执行,如果需要等待则使用 join

from multiprocessing import Pool

def proc(num):
    print(f'我是子进程')

if __name__ == '__main__':
    # 定义线程池,设置并发数为5
    pool = Pool(5)
    # 使用map向函数传入多个参数(每传入一个参数则会调用一次函数,同时调用的数量由进程数决定)
    pool.imap(proc, range(5))
    # 关闭进程池
    pool.close()
    # 等待子进程结束
    pool.join()
    print('========== 结束 ==========')

使用高并发需要注意以下几点:

5. 进程同步

进程同步是指多个进程在共享资源时的协调与同步。单个进程运行时 ,程序的执行是顺序的;多个进程并发执行时可能会造成资源竞争、死锁等问题。因此,进程同步是保证每个进程在使用共享资源时的顺序和正确性。

5.1. 进程加锁的方式

两个进程间实现同步有2种方式,1、手动加锁(释放锁);2、with自动加锁(释放锁)

手动加锁方式

from multiprocessing import Lock

def func():
    # 手动加锁
    Lock().acquire()

    '''执行程序'''
    pass

    # 手动释放锁
    Lock().release()

with 自动加锁(释放锁)

from multiprocessing import Lock

def func():
    # with自动加锁(释放锁)
    with Lock():
        '''执行程序'''
        pass

5.2. 实现进程同步

实现逻辑

代码如下

from multiprocessing import Process, Lock, Value
from time import sleep

def func1(lock, shared_var):
    for i in range(3):
        with lock:
            shared_var.value -= 1
            print(f'[func1] 共享变量var当前值为:{shared_var.value}')
            sleep(1)

def func2(lock, shared_var):
    for i in range(3):
        with lock:
            shared_var.value += 2
            print(f'[func2] 共享变量var当前值为:{shared_var.value}')
            sleep(1)

if __name__ == '__main__':
    lock = Lock()
    shared_var = Value('i', 10)

    # 定义子进程
    p1 = Process(target=func1, args=(lock, shared_var))
    p2 = Process(target=func2, args=(lock, shared_var))

    # 启动子进程
    p1.start()
    p2.start()

    # 等待子进程结束
    p1.join()
    p2.join()

    print(f'最终共享变量var为:{shared_var.value}')

错误示例(使用了多个 Lock() )

from multiprocessing import Process, Lock, Value
from time import sleep

def func1(shared_var):
    '''定义子进程,让共享变量-1,执行3次'''
    for i in range(3):
        with Lock():    #错误语句
            shared_var.value -= 1
            print(f'[func1] 共享变量var当前值为:{shared_var.value}')
            sleep(1)

def func2(shared_var):
    '''定义子进程,让共享变量+2,执行3次'''
    for i in range(3):
        with Lock():    #错误语句
            shared_var.value += 2
            print(f'[func2] 共享变量var当前值为:{shared_var.value}')
            sleep(1)

if __name__ == '__main__':
    shared_var = Value('i', 10)

    # 定义子进程
    p1 = Process(target=func1, args=(shared_var,))
    p2 = Process(target=func2, args=(shared_var,))

    # 启动子进程
    p1.start()
    p2.start()

    # 等待子进程结束
    p1.join()
    p2.join()

    print(f'最终共享变量var为:{shared_var.value}')

按同步逻辑执行的过程应该是:共享变量 = 10(初始) - 1 + 2 - 1 + 2 - 1 + 2 = 13,但实际的值却是11。下图所示,其中一个步骤出错,并没有加锁。

仔细翻看代码,发现在每个函数中都使用了 Lock() 方法,也就是说2个函数使用的是2个锁,自然不会加锁,出现了脏读现象。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文