python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python子进程创建多线程

Python子进程中创建多线程的完整指南

作者:Yant224

在操作系统中,进程是资源分配的基本单位,每个进程都有独立的内存空间、文件描述符等系统资源,而线程是进程内的执行单元,多个线程共享同一进程的资源,本文给大家介绍了Python子进程中创建多线程的完整指南,需要的朋友可以参考下

一、理解进程与线程的关系

1.1 进程与线程的基本概念

在操作系统中,进程是资源分配的基本单位,每个进程都有独立的内存空间、文件描述符等系统资源。而线程是进程内的执行单元,多个线程共享同一进程的资源。

Python中的特殊之处在于**全局解释器锁(GIL)**的存在,这使得在单个进程中,多线程无法真正并行执行CPU密集型任务。但在I/O密集型任务中,多线程仍然能显著提升性能。

1.2 子进程内多线程的架构模型

这种架构的优势在于:

二、实现原理与技术细节

2.1 Python的多进程模块

Python提供了multiprocessing模块来创建和管理进程:

import multiprocessing
import os

def worker():
    print(f"进程ID: {os.getpid()}, 进程名称: {multiprocessing.current_process().name}")

if __name__ == "__main__":
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, name=f"Process-{i}")
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

2.2 进程间通信(IPC)机制

由于进程有独立的内存空间,必须使用特殊的通信机制:

通信方式描述适用场景
Queue先进先出的队列生产者-消费者模式
Pipe双向通信通道一对一通信
Shared Memory共享内存区域高性能数据共享
Manager托管共享对象复杂数据结构共享

三、三种实现方法详解

3.1 方法一:基础组合方式

import multiprocessing
import threading
import time

def thread_task(thread_id):
    """线程工作函数"""
    print(f"线程 {thread_id} 在进程 {multiprocessing.current_process().name} 中运行")
    time.sleep(2)
    return f"线程 {thread_id} 完成"

def process_task():
    """进程工作函数"""
    print(f"进程 {multiprocessing.current_process().name} 启动")
    
    # 创建并启动多个线程
    threads = []
    results = []
    
    for i in range(4):
        t = threading.Thread(target=thread_task, args=(i,))
        threads.append(t)
        t.start()
    
    # 等待所有线程完成
    for t in threads:
        t.join()
    
    print(f"进程 {multiprocessing.current_process().name} 结束")

if __name__ == "__main__":
    # 创建3个子进程
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=process_task, name=f"SubProcess-{i}")
        processes.append(p)
        p.start()
    
    # 等待所有子进程完成
    for p in processes:
        p.join()
    
    print("所有进程完成")

3.2 方法二:使用进程池和线程池

import concurrent.futures
import multiprocessing
import threading
import time

def thread_worker(data):
    """线程池工作函数"""
    process_name = multiprocessing.current_process().name
    thread_name = threading.current_thread().name
    time.sleep(0.5)  # 模拟工作负载
    return f"{process_name}-{thread_name} 处理: {data}"

def process_worker(data_chunk):
    """进程池工作函数"""
    print(f"进程 {multiprocessing.current_process().name} 开始处理 {len(data_chunk)} 个项目")
    
    # 使用线程池处理数据
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(thread_worker, data_chunk))
    
    return results

if __name__ == "__main__":
    # 准备数据
    all_data = [f"data_{i}" for i in range(20)]
    chunk_size = 5
    data_chunks = [all_data[i:i+chunk_size] for i in range(0, len(all_data), chunk_size)]
    
    # 使用进程池
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(process_worker, chunk) for chunk in data_chunks]
        
        # 收集结果
        all_results = []
        for future in concurrent.futures.as_completed(futures):
            all_results.extend(future.result())
    
    print("处理完成,结果:")
    for result in all_results:
        print(f"  {result}")

3.3 方法三:自定义进程类

import multiprocessing
import threading
import time

class ThreadedProcess(multiprocessing.Process):
    def __init__(self, task_id, data_list):
        super().__init__()
        self.task_id = task_id
        self.data_list = data_list
        self.results = multiprocessing.Manager().list()
    
    def run(self):
        print(f"进程 {self.name} 开始处理任务 {self.task_id}")
        
        # 创建线程
        threads = []
        for i, data in enumerate(self.data_list):
            thread = threading.Thread(
                target=self.process_item,
                args=(data, i)
            )
            threads.append(thread)
            thread.start()
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        
        print(f"进程 {self.name} 完成任务 {self.task_id}")
    
    def process_item(self, data, index):
        """处理单个数据项"""
        thread_name = threading.current_thread().name
        print(f"线程 {thread_name} 处理: {data}")
        time.sleep(0.3)  # 模拟处理时间
        
        # 处理数据并保存结果
        result = {
            'index': index,
            'data': data,
            'processed': data.upper(),  # 示例处理
            'thread': thread_name,
            'process': self.name
        }
        self.results.append(result)

if __name__ == "__main__":
    # 准备任务数据
    tasks = [
        (1, ['apple', 'banana', 'cherry']),
        (2, ['dog', 'elephant', 'fox']),
        (3, ['green', 'blue', 'red', 'yellow'])
    ]
    
    # 创建并启动进程
    processes = []
    for task_id, data_list in tasks:
        process = ThreadedProcess(task_id, data_list)
        processes.append(process)
        process.start()
    
    # 等待所有进程完成
    for process in processes:
        process.join()
    
    # 汇总结果
    all_results = []
    for process in processes:
        all_results.extend(list(process.results))
    
    print("\n所有任务完成,结果汇总:")
    for result in all_results:
        print(f"  任务{result['index']}: {result['data']} -> {result['processed']}")

四、进程间通信实战

4.1 使用Queue进行进程间通信

import multiprocessing
import threading
import time
import random

def producer(queue, producer_id):
    """生产者线程函数"""
    for i in range(5):
        item = f"生产者{producer_id}-项目{i}"
        queue.put(item)
        print(f"生产: {item}")
        time.sleep(random.uniform(0.1, 0.5))
    queue.put(f"生产者{producer_id}-完成")

def consumer_process(queue, consumer_id):
    """消费者进程函数"""
    print(f"消费者进程 {consumer_id} 启动")
    
    completed_producers = 0
    total_producers = 2  # 假设有2个生产者
    
    while completed_producers < total_producers:
        try:
            item = queue.get(timeout=5)
            if item.endswith("-完成"):
                completed_producers += 1
                print(f"消费者{consumer_id} 收到完成信号: {item}")
            else:
                print(f"消费者{consumer_id} 处理: {item}")
                time.sleep(random.uniform(0.2, 0.8))  # 模拟处理时间
        except queue.Empty:
            print(f"消费者{consumer_id} 等待超时")
            break
    
    print(f"消费者进程 {consumer_id} 结束")

def producer_process(queue, process_id):
    """生产者进程函数"""
    print(f"生产者进程 {process_id} 启动")
    
    # 在生产者进程中创建多个线程
    producer_threads = []
    for i in range(2):  # 每个进程创建2个生产者线程
        thread = threading.Thread(
            target=producer,
            args=(queue, f"P{process_id}-T{i}")
        )
        producer_threads.append(thread)
        thread.start()
    
    # 等待所有生产者线程完成
    for thread in producer_threads:
        thread.join()
    
    print(f"生产者进程 {process_id} 结束")

if __name__ == "__main__":
    # 创建进程间通信队列
    queue = multiprocessing.Queue(maxsize=10)
    
    # 创建生产者进程
    producer_processes = []
    for i in range(2):
        p = multiprocessing.Process(
            target=producer_process,
            args=(queue, i)
        )
        producer_processes.append(p)
        p.start()
    
    # 创建消费者进程
    consumer_processes = []
    for i in range(2):
        c = multiprocessing.Process(
            target=consumer_process,
            args=(queue, i)
        )
        consumer_processes.append(c)
        c.start()
    
    # 等待所有进程完成
    for p in producer_processes:
        p.join()
    
    for c in consumer_processes:
        c.join()
    
    print("所有生产消费任务完成")

五、性能优化与最佳实践

5.1 资源管理策略

1.合理设置进程和线程数量

import os

# 根据CPU核心数设置进程数
cpu_count = os.cpu_count()
process_pool_size = max(1, cpu_count - 1)  # 留一个核心给系统

# 根据任务类型设置线程数
if task_type == "io_intensive":
    thread_pool_size = 10  # I/O密集型可以更多线程
else:
    thread_pool_size = cpu_count  # CPU密集型不宜过多

2.使用连接池管理资源

from multiprocessing import Pool
import threading
import database  # 假设的数据库模块

# 进程级别的连接池
process_conn_pool = None

def init_process():
    global process_conn_pool
    process_conn_pool = database.ConnectionPool(max_connections=5)

def thread_task(query):
    # 从进程级连接池获取连接
    conn = process_conn_pool.get_connection()
    try:
        result = conn.execute(query)
        return result
    finally:
        process_conn_pool.release_connection(conn)

def process_worker(queries):
    with threading.ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(thread_task, queries))
    return results

if __name__ == "__main__":
    queries = [f"SELECT * FROM table WHERE id = {i}" for i in range(10)]

    with Pool(processes=2, initializer=init_process) as pool:
        results = pool.map(process_worker, [queries[:5], queries[5:]])

5.2 错误处理与重试机制

import multiprocessing
import threading
import time
from functools import wraps

def retry(max_attempts=3, delay=1):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    attempts += 1
                    print(f"尝试 {attempts} 失败: {e}")
                    if attempts < max_attempts:
                        time.sleep(delay)
                    else:
                        raise
        return wrapper
    return decorator

@retry(max_attempts=3, delay=2)
def reliable_thread_task(data):
    """可靠的线程任务"""
    # 模拟可能失败的操作
    if random.random() < 0.3:  # 30%概率失败
        raise ValueError("随机失败")
    
    time.sleep(0.5)
    return f"成功处理: {data}"

def robust_process():
    """健壮的进程函数"""
    try:
        threads = []
        results = []
        
        for i in range(5):
            t = threading.Thread(
                target=lambda: results.append(reliable_thread_task(f"data-{i}"))
            )
            threads.append(t)
            t.start()
        
        for t in threads:
            t.join()
        
        print(f"处理结果: {results}")
        
    except Exception as e:
        print(f"进程失败: {e}")
        # 这里可以添加更复杂的错误处理逻辑

if __name__ == "__main__":
    processes = []
    for i in range(2):
        p = multiprocessing.Process(target=robust_process)
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

六、实战应用:Web服务请求处理

import multiprocessing
import threading
import time
import random
from http.server import HTTPServer, BaseHTTPRequestHandler
import json

class RequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 模拟处理时间
        processing_time = random.uniform(0.1, 1.0)
        time.sleep(processing_time)
        
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        
        response = {
            'path': self.path,
            'processing_time': processing_time,
            'process': multiprocessing.current_process().name,
            'thread': threading.current_thread().name
        }
        
        self.wfile.write(json.dumps(response).encode())

def run_server(port):
    """运行HTTP服务器"""
    server = HTTPServer(('localhost', port), RequestHandler)
    print(f"服务器在进程 {multiprocessing.current_process().name} 中启动,端口: {port}")
    server.serve_forever()

def health_checker(server_ports):
    """健康检查线程"""
    while True:
        time.sleep(5)
        print(f"健康检查: 服务器进程正常运行,监控端口: {server_ports}")

def server_process(port):
    """服务器进程函数"""
    # 创建服务器线程
    server_thread = threading.Thread(
        target=run_server,
        args=(port,),
        daemon=True
    )
    
    # 创建健康检查线程
    health_thread = threading.Thread(
        target=health_checker,
        args=([port],),
        daemon=True
    )
    
    server_thread.start()
    health_thread.start()
    
    # 等待服务器线程结束
    server_thread.join()

if __name__ == "__main__":
    # 启动多个服务器进程,每个进程在不同的端口上运行
    ports = [8000, 8001, 8002]
    processes = []
    
    for port in ports:
        p = multiprocessing.Process(
            target=server_process,
            args=(port,),
            name=f"ServerProcess-{port}"
        )
        processes.append(p)
        p.start()
    
    print(f"启动了 {len(processes)} 个服务器进程")
    
    try:
        # 主进程保持运行
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("正在关闭服务器...")
        for p in processes:
            p.terminate()
        for p in processes:
            p.join()
        print("所有服务器已关闭")

以上就是Python子进程中创建多线程的完整指南的详细内容,更多关于Python子进程创建多线程的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文