python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python multiprocessing多进程并行计算

Python使用multiprocessing模块实现多进程并行计算

作者:彬彬侠

Python的multiprocessing模块是一个标准库模块,用于实现多进程并行计算,相比线程(threading 模块),multiprocessing更适合需要高性能计算的场景,本文将详细介绍multiprocessing模块的定义、功能、用法、示例、应用场景、最佳实践和注意事项,需要的朋友可以参考下

引言

Python 的 multiprocessing 模块是一个标准库模块,用于实现多进程并行计算。它通过创建独立的进程,绕过 Python 的全局解释器锁(GIL),在多核 CPU 上实现真正的并行,特别适合 CPU 密集型任务(如数值计算、图像处理)。相比线程(threading 模块),multiprocessing 更适合需要高性能计算的场景。本文将详细介绍 multiprocessing 模块的定义、功能、用法、示例、应用场景、最佳实践和注意事项。

1. multiprocessing 模块的定义和原理

1.1 定义

multiprocessing 是一个跨平台的模块,提供创建和管理进程的 API,支持进程间通信(IPC)、同步机制和共享资源管理。它模仿了 threading 模块的接口,方便开发者从线程迁移到进程。

核心功能

依赖:标准库,无需额外安装。

1.2 原理

1.3 导入

import multiprocessing

2. multiprocessing 的核心组件和功能

2.1 进程创建(Process)

通过 multiprocessing.Process 创建进程,运行指定函数。

构造函数

Process(target=None, args=(), kwargs={}, name=None, daemon=None)

主要方法

示例

import multiprocessing

def worker(num):
    print(f"Worker {num} running in process {multiprocessing.current_process().name}")

if __name__ == "__main__":
    processes = [multiprocessing.Process(target=worker, args=(i,)) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

输出(顺序可能不同):

Worker 0 running in process Process-1
Worker 1 running in process Process-2
Worker 2 running in process Process-3

2.2 进程池(Pool)

Pool 用于管理固定数量的进程,适合并行处理大量任务。

构造函数

Pool(processes=None, initializer=None, initargs=())

主要方法

示例

from multiprocessing import Pool

def square(n):
    return n * n

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        results = pool.map(square, range(10))
    print(results)  # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.3 进程通信

支持 PipeQueue 实现进程间数据交换。

Pipe

构造函数

Pipe(duplex=True)

示例

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from sender")
    conn.close()

def receiver(conn):
    print(conn.recv())
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

输出

Hello from sender

Queue

构造函数

Queue(maxsize=0)

示例

from multiprocessing import Process, Queue

def producer(queue):
    queue.put("Data from producer")

def consumer(queue):
    print(queue.get())

if __name__ == "__main__":
    queue = Queue()
    p1 = Process(target=producer, args=(queue,))
    p2 = Process(target=consumer, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

2.4 同步机制

提供锁、信号量等原语,确保进程安全访问共享资源。

Lock

from multiprocessing import Process, Lock

def printer(lock, msg):
    with lock:
        print(msg)

if __name__ == "__main__":
    lock = Lock()
    processes = [Process(target=printer, args=(lock, f"Message {i}")) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

Semaphore

from multiprocessing import Process, Semaphore

def worker(sem, name):
    with sem:
        print(f"{name} acquired resource")
        # 模拟工作

if __name__ == "__main__":
    sem = Semaphore(2)  # 允许 2 个进程同时访问
    processes = [Process(target=worker, args=(sem, f"Worker {i}")) for i in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

Event

from multiprocessing import Process, Event
import time

def wait_for_event(event):
    event.wait()
    print("Event triggered")

if __name__ == "__main__":
    event = Event()
    p = Process(target=wait_for_event, args=(event,))
    p.start()
    time.sleep(1)
    event.set()  # 触发事件
    p.join()

2.5 共享内存

通过 ValueArray 共享基本数据类型。

示例

from multiprocessing import Process, Value, Array

def modify(shared_num, shared_arr):
    shared_num.value += 1
    for i in range(len(shared_arr)):
        shared_arr[i] += 1

if __name__ == "__main__":
    num = Value("i", 0)  # 共享整数
    arr = Array("i", [1, 2, 3])  # 共享数组
    p = Process(target=modify, args=(num, arr))
    p.start()
    p.join()
    print(num.value)  # 输出: 1
    print(list(arr))  # 输出: [2, 3, 4]

3. 应用场景

数值计算

图像处理

机器学习

数据处理

爬虫

4. 示例:多进程爬虫

结合 urllibQueue 实现并行网页抓取。

示例

import urllib.request
from multiprocessing import Process, Queue
from urllib.error import URLError

def fetch_url(queue, url):
    try:
        with urllib.request.urlopen(url) as response:
            content = response.read().decode("utf-8")
            queue.put((url, len(content)))
    except URLError as e:
        queue.put((url, str(e)))

def main():
    urls = ["https://example.com", "https://python.org", "https://invalid-url"]
    queue = Queue()
    processes = [Process(target=fetch_url, args=(queue, url)) for url in urls]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    while not queue.empty():
        url, result = queue.get()
        print(f"{url}: {result}")

if __name__ == "__main__":
    main()

输出(示例):

https://example.com: 1256
https://python.org: 50000
https://invalid-url: [Errno 11001] getaddrinfo failed

5. 最佳实践

使用 if __name__ == "__main__":

示例

if __name__ == "__main__":
    p = Process(target=worker)
    p.start()

选择进程池

示例

with Pool(4) as pool:
    results = pool.map(func, data)

优化通信

示例

arr = Array("i", [0] * size)

异常处理

示例

def worker(queue):
    try:
        # 工作代码
    except Exception as e:
        queue.put(str(e))

测试代码

示例

import pytest
from multiprocessing import Process

def test_process():
    def worker():
        print("Test")
    p = Process(target=worker)
    p.start()
    p.join()
    assert p.exitcode == 0

进程数选择

示例

processes = min(len(tasks), multiprocessing.cpu_count())

6. 注意事项

GIL 限制

示例

# I/O 密集型:使用 asyncio
import asyncio
async def fetch():
    pass

Windows 兼容性

示例

if __name__ == "__main__":
    main()

资源管理

示例

with Pool() as pool:
    pool.map(func, data)

序列化开销

示例

shared_data = Value("d", 0.0)

调试难度

示例

import logging
logging.basicConfig(level=logging.INFO)

7. 总结

Python 的 multiprocessing 模块是实现多进程并行的强大工具,绕过 GIL,适合 CPU 密集型任务。其核心特点包括:

以上就是Python使用multiprocessing模块实现多进程并行计算的详细内容,更多关于Python multiprocessing多进程并行计算的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文