python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python之multiprocessing包

Python之multiprocessing包使用及说明

作者:BBluster

multiprocessing是 Python标准库中的的一个包,支持进程间的并发执行,它提供了一个类似 threading 的 API,允许创建新的进程,每个进程有自己的 Python 解释器器和内存空间,通过使用multiprocessing,开发者可以利用多核或多 CPU 的系统,进行 CPU 密集型任务

简介

multiprocessing 是 Python 标准库中的一个包,它支持进程间的并发执行,提供了一个与 threading 模块相似的API。

通过使用 multiprocessing,开发者可以在 Python 程序中创建新的进程,每个进程都拥有自己的 Python 解释器和内存空间,从而能够并行执行任务。

这在进行CPU密集型任务时尤其有用,因为它可以绕过全局解释器锁(GIL),让多核或多CPU的系统可以被充分利用.

主要功能

进程管理

进程间通信(IPC)

同步原语

进程池

共享状态

其他组件

功能详解

进程管理

Process类

构造方法

init(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

实例方法

start()

join(timeout=None)

is_alive()

terminate()

kill()

close()

属性

exitcode

name

daemon

pid

示例

from multiprocessing import Process
import time

def worker(name, sleep_time):
    print(f"Started worker {name}")
    time.sleep(sleep_time)
    print(f"Sleep time :{sleep_time}")

if __name__ == "__main__":
    for i in range(3):
        process = Process(target=worker, args=(f'Bob-{i}',i), name=f'Worker-{i}')
        process.start()  # 启动进程
        print(f"进程是否存活:{process.is_alive()}")
        print(time.time())

>>>
进程是否存活:True
1734319313.823571
进程是否存活:True
1734319313.824584
进程是否存活:True
1734319313.825512
Started worker Bob-1
Started worker Bob-2
Started worker Bob-0
Sleep time :0
Sleep time :1
Sleep time :2

进程间通信(IPC)

Queue类

Queue 类是 multiprocessing 模块中提供的一个重要组件,用于实现进程间通信(IPC)。通过队列,不同的进程可以安全地交换信息或数据。

构造方法

Queue(maxsize=0)

实例方法

put(item, block=True, timeout=None)

get(block=True, timeout=None)

empty()

full()

qsize()

close()

join_thread()

cancel_join_thread()

示例

from multiprocessing import Process, Queue

def worker(q, msg):
    q.put(msg)

if __name__ == "__main__":
    msg = "Hello from parent process"
    q = Queue()
    p_1 = Process(target=worker, args=(q, f"{msg}-1"))
    p_2 = Process(target=worker, args=(q, f"{msg}-2"))
    p_1.start()
    p_2.start()
    p_1.join()
    p_2.join()
    while not q.empty():
        print(q.get()) 
    
>>>
Hello from parent process-1
Hello from parent process-2

注意事项

Pipe函数

Pipe 函数是 multiprocessing 模块中用于进程间通信的另一种主要机制。它创建了一对连接对象,默认情况下是双向的(双工),但也可以配置为单向(半双工)。通过管道,进程可以相互发送和接收 Python 对象。Pipe 通常用于两个进程间的通信。

Pipe() 函数

multiprocessing.Pipe(duplex=True)

Pipe() 函数返回一对 Connection 对象,代表管道的两端。

Connection 对象的方法

管道两端的 Connection 对象提供了一组用于通信的方法:

send(obj)

recv()

fileno()

close()

poll(timeout=None)

示例

from multiprocessing import Process, Pipe

def worker(conn):
    conn.send([42, None, 'hello from child'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
    
    print(parent_conn.recv())   # 输出: [42, None, 'hello from child']
    parent_conn.close()
    
    p.join()

>>>
[42, None, 'hello from child']

注意事项

同步原语

Lock类

Lock 类是 multiprocessing 模块中提供的一种简单的同步原语。

它用于控制多个进程对共享资源的访问,以防止资源的并发访问导致数据错乱或损坏。Lock 类似于线程模块中的锁,但它是为进程间同步而设计的。

构造方法

Lock 类没有特定的构造参数,你可以直接创建一个 Lock 实例

from multiprocessing import Lock
lock = Lock()

实例方法

acquire(block=True, timeout=-1)

release()

locked()

示例

from multiprocessing import Process, Lock
import time

# 定义一个简单的共享资源访问函数
def worker_with(lock, num):
    with lock:
        print(f"Worker {num} has acquired the lock")
        time.sleep(1)
        print(f"Worker {num} is releasing the lock")

if __name__ == "__main__":
    lock = Lock()
    processes = [Process(target=worker_with, args=(lock, n)) for n in range(5)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print("Processing complete.")

>>>
确保了在任何时刻只有一个进程能够访问共享资源,从而避免了并发访问的问题

注意事项

Event类

Event 类是 Python multiprocessing 模块中提供的一个同步原语,用于在进程之间通信和协调。它模拟了线程模块中的 Event 对象,允许一个进程向其他多个进程发出某个事件的发生,这对于在多个进程之间同步操作非常有用

构造方法

创建一个 Event 对象很简单,不需要任何参数

from multiprocessing import Event
event = Event()

实例方法

set()

clear()

is_set()

wait(timeout=None)

示例

from multiprocessing import Process, Event
import time

def worker(event, id):
    print(f"Worker {id} waiting for event.")
    event.wait()  # 阻塞,直到事件被设置。
    print(f"Worker {id} received event.")

if __name__ == "__main__":
    event = Event()

    # 创建并启动三个工作进程
    workers = [Process(target=worker, args=(event, i)) for i in range(3)]
    for w in workers:
        w.start()

    print("Main process doing some work.")
    time.sleep(2)  # 模拟主进程工作一段时间

    event.set()  # 设置事件,唤醒所有等待的进程
    print("Main process triggered the event.")

    for w in workers:
        w.join()  # 等待所有工作进程完成

    print("Main process exiting.")
    
>>>
Main process doing some work.
Worker 0 waiting for event.
Worker 1 waiting for event.
Worker 2 waiting for event.
Main process triggered the event.
Worker 0 received event.
Worker 1 received event.
Worker 2 received event.
Main process exiting.

注意事项

Semaphore类

Semaphore 类是 multiprocessing 模块中提供的一种同步原语,用于控制对共享资源的访问数量。它可以被视为一个可用资源的计数器,是一种更为通用的同步机制,可以用来解决多个进程访问有限数量的资源问题。

构造方法

Semaphore 对象在创建时可以接受一个可选的整数值,用于指定信号量的初始值,即同时可以访问共享资源的进程数量。如果不指定,则默认值为1,此时它的行为类似于 Lock

from multiprocessing import Semaphore

sem = Semaphore(value=3)

实例方法

acquire(block=True, timeout=None)

尝试减少信号量计数器的值(即获取资源)。如果计数器的值大于0,则减1并立即返回 True。如果计数器的值为0,则根据 block 参数的值行为会有所不同:

release()

get_value() (在某些Python版本中不推荐使用)

示例

from multiprocessing import Process, Semaphore
import time

sem = Semaphore(2)  # 最多允许2个进程同时访问共享资源

def worker(sem, num):
    with sem:
        print(f"Worker {num} is working.")
        time.sleep(num)  # 模拟耗时操作
    print(f"Worker {num} finished.")

if __name__ == "__main__":
    workers = [Process(target=worker, args=(sem, i)) for i in range(5)]

    for w in workers:
        w.start()

    for w in workers:
        w.join()

    print("All workers completed.")

>>>
Worker 0 is working.
Worker 0 finished.
Worker 4 is working.
Worker 3 is working.
Worker 3 finished.
Worker 2 is working.
Worker 4 finished.
Worker 1 is working.
Worker 1 finished.
Worker 2 finished.
All workers completed.

注意事项

Condition类

Condition 类是 multiprocessing 模块中提供的一个同步原语,用于在进程之间等待某些条件的满足。它允许一个或多个进程等待某个条件变为真,而在条件满足时能够通知一个或多个等待的进程继续执行。Condition 常常与共享资源或状态变更相关的场景一起使用,提供了一种更加灵活的进程同步机制

构造方法

在创建 Condition 对象时,可以选择传递一个 Lock 或 RLock 对象用于内部使用。如果不传递,则 Condition 对象会自动创建一个新的 RLock 对象

from multiprocessing import Condition

cond = Condition()

实例方法

acquire(*args, **kwargs)

release()

wait(timeout=None)

notify(n=1)

notify_all()notifyAll()

示例

from multiprocessing import Process, Condition, Array
import time

def producer(cond, shared_array):
    with cond:
        print("Producer adding item.")
        shared_array[0] += 1
        print("Producer done, item added.")
        cond.notify()

def consumer(cond, shared_array):
    with cond:
        print("Consumer waiting for item.")
        cond.wait()
        print("Consumer consumed item.")
        shared_array[0] -= 1

if __name__ == "__main__":
    condition = Condition()
    shared_array = Array('i', [0])

    p = Process(target=producer, args=(condition, shared_array))
    c = Process(target=consumer, args=(condition, shared_array))

    c.start()
    time.sleep(2)  # 确保消费者先运行
    p.start()

    p.join()
    c.join()

    print("Final shared value:", shared_array[0])
>>>
Consumer waiting for item.
Producer adding item.
Producer done, item added.
Consumer consumed item.
Final shared value: 0

注意事项

进程池

Pool类

Pool 类是 Python 的 multiprocessing 模块提供的一个高级接口,用于并行执行多个进程。Pool 类可以自动管理进程池中的进程数量,让你轻松地将任务分配给进程池执行,而无需手动管理每个进程的创建和终止。这对于执行 CPU 密集型任务特别有用

构造方法

创建 Pool 对象时,可以指定几个参数,最常用的是 processes 参数,它指定了进程池中进程的数量。如果不指定,默认为机器的 CPU 核心数

from multiprocessing import Pool

pool = Pool(processes=4)

实例方法

apply(func, args=(), kwds={})

apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

map(func, iterable, chunksize=None)

map_async(func, iterable, chunksize=None, callback=None, error_callback=None)

close()

join()

示例

from multiprocessing import Pool
import time
import os

def square(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    print(time.time())
    # 创建一个包含4个进程的进程池
    with Pool(processes=os.cpu_count()) as p:
        # 使用 map 函数分配任务
        results = p.map(square, range(10))
        print(results)
        # 使用 apply_async 异步执行任务
        async_result = p.apply_async(square, (10,))
        print(async_result.get())  # 使用 get 方法获取异步任务的结果
    print(time.time())
>>>
1734330507.251192
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
100
1734330510.379658

注意事项

共享状态

Value和Array类

在 Python 的 multiprocessing 模块中,ValueArray 是两个用于进程间共享数据的类。由于进程间共享内存并非像线程间共享全局变量那样简单,这两个类提供了一种在不同进程间安全共享数据的方法。

Value

Value 类用于在进程之间共享一个存储在共享内存中的单一数据值。它适用于当你只需要共享一个简单的数据元素,如一个整数或浮点数时。

使用方法:创建 Value 对象时,需要指定数据类型和初始值。数据类型是使用类似于 C 语言类型声明的字符串来指定的,例如 ‘i’ 表示整数,‘d’ 表示双精度浮点数。

from multiprocessing import Value

# 创建一个共享的整数变量,初始值为 0
num = Value('i', 0)

访问和修改值:可以通过 .value 属性来访问和修改存储在 Value 中的数据。

# 读取值
print(num.value)

# 修改值
num.value = 10

Array

Array 类用于在进程之间共享一个存储在共享内存中的数组。它适用于当你需要共享一组数据(如一系列整数或浮点数)时。

使用方法:创建 Array 对象时,同样需要指定数据类型和数组的大小。数据类型的指定方式与 Value 类似,数组的大小则通过一个整数来指定。

from multiprocessing import Array

# 创建一个共享的整数数组,包含 10 个元素,初始值都为 0
arr = Array('i', 10)

访问和修改数组元素:Array 支持类似于列表的索引和切片操作来访问和修改数组中的元素。

# 读取第一个元素
print(arr[0])

# 修改第一个元素
arr[0] = 1

# 获取数组的长度
print(len(arr))

# 使用切片
arr[1:3] = [2, 3]

Manager类

Manager 类在 Python multiprocessing 模块中提供了一种方式,允许你在不同进程间共享数据。与 Value 和 Array 直接在共享内存中存储数据不同,Manager 类创建的数据结构是通过一个服务器进程管理的。这意味着它允许更加灵活的数据共享方式,不仅限于简单的数值或数组,也支持列表、字典、Namespace 等更复杂的数据结构。

基本使用

使用 Manager 类时,首先需要创建一个 Manager 对象,然后使用这个对象来创建共享的数据结构。

from multiprocessing import Manager

with Manager() as manager:
    # 创建一个共享的列表
    shared_list = manager.list([0, 1, 2])
    
    # 创建一个共享的字典
    shared_dict = manager.dict({'a': 1, 'b': 2})

创建的共享数据结构可以像普通的数据结构那样被访问和修改,但是它们实际上是在一个单独的服务器进程中维护的。这意味着你可以安全地在多个进程间共享和修改这些数据结构,而无需担心进程安全问题。

Manager 支持的类型

Manager 类支持多种类型的共享数据结构,包括但不限于:

示例代码

以下示例展示了如何使用 Manager 类在进程间共享列表和字典:

from multiprocessing import Process, Manager

def worker(shared_list, shared_dict):
    shared_list.append('new item')
    shared_dict['new_key'] = 'new value'

if __name__ == "__main__":
    with Manager() as manager:
        shared_list = manager.list([1, 2, 3])
        shared_dict = manager.dict({'key1': 'value1', 'key2': 'value2'})
        
        p = Process(target=worker, args=(shared_list, shared_dict))
        p.start()
        p.join()
        
        print(f"Shared list: {shared_list}")
        print(f"Shared dict: {shared_dict}")

其他组件

multiprocessing 模块提供了一些用于查询系统和进程状态的函数,这些函数对于编写并发程序时了解资源利用情况和进程管理非常有用。以下是对 cpu_count、current_process 和 active_children 三个方法的详细解释:

cpu_count()

cpu_count 函数返回机器上可用的 CPU 核心数。这个信息对于决定并行程序中进程池(Pool)的大小非常有用,因为你可能想要根据可用的处理器数量来优化你的程序性能。

使用场景:在创建进程池时,可以根据 cpu_count 的返回值来设置进程池的大小。例如,如果你的机器有 4 个 CPU 核心,那么创建一个包含 4 个进程的进程池可能是一个合理的选择。

from multiprocessing import cpu_count

print(f"Number of CPUs: {cpu_count()}")

current_process()

current_process 函数返回当前进程的信息。这个函数返回一个 Process 对象,其中包含有关当前进程的详细信息,如进程的名称、PID(进程ID)等。

使用场景:这个函数在调试并发程序时特别有用,因为你可以用它来识别当前正在执行的进程。例如,在多进程环境中打印日志时,可能会包含进程的名称或 PID 来区分日志消息是从哪个进程生成的。

from multiprocessing import current_process

process = current_process()
print(f"Current process name: {process.name}")
print(f"Current process ID: {process.pid}")

active_children()

active_children 函数返回一个列表,包含当前活跃的子进程对象。每个子进程对象都包含有关该进程的信息,如名称、PID 等。这个函数可以在父进程中调用,以获取当前所有活跃的子进程列表。

使用场景:在管理和监控基于 multiprocessing 的并发程序时,active_children 函数可以帮助你了解有多少子进程正在运行。这对于确保所有预期内的子进程都已启动,或者在程序结束时检查是否还有未完成的子进程非常有用。

from multiprocessing import Process, active_children
import time

def worker():
    print("Worker sleeping...")
    time.sleep(2)
    print("Worker done.")

if __name__ == "__main__":
    for _ in range(2):
        p = Process(target=worker)
        p.start()
    
    time.sleep(1)  # 等待一段时间,让子进程启动
    for child in active_children():
        print(f"Active child: PID={child.pid}, Name={child.name}")

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文