Python子进程中创建多线程的完整指南
作者:Yant224
在操作系统中,进程是资源分配的基本单位,每个进程都有独立的内存空间、文件描述符等系统资源,而线程是进程内的执行单元,多个线程共享同一进程的资源,本文给大家介绍了Python子进程中创建多线程的完整指南,需要的朋友可以参考下
一、理解进程与线程的关系
1.1 进程与线程的基本概念
在操作系统中,进程是资源分配的基本单位,每个进程都有独立的内存空间、文件描述符等系统资源。而线程是进程内的执行单元,多个线程共享同一进程的资源。
Python中的特殊之处在于**全局解释器锁(GIL)**的存在,这使得在单个进程中,多线程无法真正并行执行CPU密集型任务。但在I/O密集型任务中,多线程仍然能显著提升性能。
1.2 子进程内多线程的架构模型
这种架构的优势在于:
- 充分利用多核CPU:每个子进程可以在不同的CPU核心上运行
- 资源共享与隔离平衡:线程共享进程资源,进程间资源隔离
- 灵活的任务分配:可以根据任务特性选择进程级或线程级并行
二、实现原理与技术细节
2.1 Python的多进程模块
Python提供了multiprocessing
模块来创建和管理进程:
import multiprocessing import os def worker(): print(f"进程ID: {os.getpid()}, 进程名称: {multiprocessing.current_process().name}") if __name__ == "__main__": processes = [] for i in range(3): p = multiprocessing.Process(target=worker, name=f"Process-{i}") processes.append(p) p.start() for p in processes: p.join()
2.2 进程间通信(IPC)机制
由于进程有独立的内存空间,必须使用特殊的通信机制:
通信方式 | 描述 | 适用场景 |
---|---|---|
Queue | 先进先出的队列 | 生产者-消费者模式 |
Pipe | 双向通信通道 | 一对一通信 |
Shared Memory | 共享内存区域 | 高性能数据共享 |
Manager | 托管共享对象 | 复杂数据结构共享 |
三、三种实现方法详解
3.1 方法一:基础组合方式
import multiprocessing import threading import time def thread_task(thread_id): """线程工作函数""" print(f"线程 {thread_id} 在进程 {multiprocessing.current_process().name} 中运行") time.sleep(2) return f"线程 {thread_id} 完成" def process_task(): """进程工作函数""" print(f"进程 {multiprocessing.current_process().name} 启动") # 创建并启动多个线程 threads = [] results = [] for i in range(4): t = threading.Thread(target=thread_task, args=(i,)) threads.append(t) t.start() # 等待所有线程完成 for t in threads: t.join() print(f"进程 {multiprocessing.current_process().name} 结束") if __name__ == "__main__": # 创建3个子进程 processes = [] for i in range(3): p = multiprocessing.Process(target=process_task, name=f"SubProcess-{i}") processes.append(p) p.start() # 等待所有子进程完成 for p in processes: p.join() print("所有进程完成")
3.2 方法二:使用进程池和线程池
import concurrent.futures import multiprocessing import threading import time def thread_worker(data): """线程池工作函数""" process_name = multiprocessing.current_process().name thread_name = threading.current_thread().name time.sleep(0.5) # 模拟工作负载 return f"{process_name}-{thread_name} 处理: {data}" def process_worker(data_chunk): """进程池工作函数""" print(f"进程 {multiprocessing.current_process().name} 开始处理 {len(data_chunk)} 个项目") # 使用线程池处理数据 with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(thread_worker, data_chunk)) return results if __name__ == "__main__": # 准备数据 all_data = [f"data_{i}" for i in range(20)] chunk_size = 5 data_chunks = [all_data[i:i+chunk_size] for i in range(0, len(all_data), chunk_size)] # 使用进程池 with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor: futures = [executor.submit(process_worker, chunk) for chunk in data_chunks] # 收集结果 all_results = [] for future in concurrent.futures.as_completed(futures): all_results.extend(future.result()) print("处理完成,结果:") for result in all_results: print(f" {result}")
3.3 方法三:自定义进程类
import multiprocessing import threading import time class ThreadedProcess(multiprocessing.Process): def __init__(self, task_id, data_list): super().__init__() self.task_id = task_id self.data_list = data_list self.results = multiprocessing.Manager().list() def run(self): print(f"进程 {self.name} 开始处理任务 {self.task_id}") # 创建线程 threads = [] for i, data in enumerate(self.data_list): thread = threading.Thread( target=self.process_item, args=(data, i) ) threads.append(thread) thread.start() # 等待所有线程完成 for thread in threads: thread.join() print(f"进程 {self.name} 完成任务 {self.task_id}") def process_item(self, data, index): """处理单个数据项""" thread_name = threading.current_thread().name print(f"线程 {thread_name} 处理: {data}") time.sleep(0.3) # 模拟处理时间 # 处理数据并保存结果 result = { 'index': index, 'data': data, 'processed': data.upper(), # 示例处理 'thread': thread_name, 'process': self.name } self.results.append(result) if __name__ == "__main__": # 准备任务数据 tasks = [ (1, ['apple', 'banana', 'cherry']), (2, ['dog', 'elephant', 'fox']), (3, ['green', 'blue', 'red', 'yellow']) ] # 创建并启动进程 processes = [] for task_id, data_list in tasks: process = ThreadedProcess(task_id, data_list) processes.append(process) process.start() # 等待所有进程完成 for process in processes: process.join() # 汇总结果 all_results = [] for process in processes: all_results.extend(list(process.results)) print("\n所有任务完成,结果汇总:") for result in all_results: print(f" 任务{result['index']}: {result['data']} -> {result['processed']}")
四、进程间通信实战
4.1 使用Queue进行进程间通信
import multiprocessing import threading import time import random def producer(queue, producer_id): """生产者线程函数""" for i in range(5): item = f"生产者{producer_id}-项目{i}" queue.put(item) print(f"生产: {item}") time.sleep(random.uniform(0.1, 0.5)) queue.put(f"生产者{producer_id}-完成") def consumer_process(queue, consumer_id): """消费者进程函数""" print(f"消费者进程 {consumer_id} 启动") completed_producers = 0 total_producers = 2 # 假设有2个生产者 while completed_producers < total_producers: try: item = queue.get(timeout=5) if item.endswith("-完成"): completed_producers += 1 print(f"消费者{consumer_id} 收到完成信号: {item}") else: print(f"消费者{consumer_id} 处理: {item}") time.sleep(random.uniform(0.2, 0.8)) # 模拟处理时间 except queue.Empty: print(f"消费者{consumer_id} 等待超时") break print(f"消费者进程 {consumer_id} 结束") def producer_process(queue, process_id): """生产者进程函数""" print(f"生产者进程 {process_id} 启动") # 在生产者进程中创建多个线程 producer_threads = [] for i in range(2): # 每个进程创建2个生产者线程 thread = threading.Thread( target=producer, args=(queue, f"P{process_id}-T{i}") ) producer_threads.append(thread) thread.start() # 等待所有生产者线程完成 for thread in producer_threads: thread.join() print(f"生产者进程 {process_id} 结束") if __name__ == "__main__": # 创建进程间通信队列 queue = multiprocessing.Queue(maxsize=10) # 创建生产者进程 producer_processes = [] for i in range(2): p = multiprocessing.Process( target=producer_process, args=(queue, i) ) producer_processes.append(p) p.start() # 创建消费者进程 consumer_processes = [] for i in range(2): c = multiprocessing.Process( target=consumer_process, args=(queue, i) ) consumer_processes.append(c) c.start() # 等待所有进程完成 for p in producer_processes: p.join() for c in consumer_processes: c.join() print("所有生产消费任务完成")
五、性能优化与最佳实践
5.1 资源管理策略
1.合理设置进程和线程数量:
import os # 根据CPU核心数设置进程数 cpu_count = os.cpu_count() process_pool_size = max(1, cpu_count - 1) # 留一个核心给系统 # 根据任务类型设置线程数 if task_type == "io_intensive": thread_pool_size = 10 # I/O密集型可以更多线程 else: thread_pool_size = cpu_count # CPU密集型不宜过多
2.使用连接池管理资源:
from multiprocessing import Pool import threading import database # 假设的数据库模块 # 进程级别的连接池 process_conn_pool = None def init_process(): global process_conn_pool process_conn_pool = database.ConnectionPool(max_connections=5) def thread_task(query): # 从进程级连接池获取连接 conn = process_conn_pool.get_connection() try: result = conn.execute(query) return result finally: process_conn_pool.release_connection(conn) def process_worker(queries): with threading.ThreadPoolExecutor(max_workers=3) as executor: results = list(executor.map(thread_task, queries)) return results if __name__ == "__main__": queries = [f"SELECT * FROM table WHERE id = {i}" for i in range(10)] with Pool(processes=2, initializer=init_process) as pool: results = pool.map(process_worker, [queries[:5], queries[5:]])
5.2 错误处理与重试机制
import multiprocessing import threading import time from functools import wraps def retry(max_attempts=3, delay=1): """重试装饰器""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): attempts = 0 while attempts < max_attempts: try: return func(*args, **kwargs) except Exception as e: attempts += 1 print(f"尝试 {attempts} 失败: {e}") if attempts < max_attempts: time.sleep(delay) else: raise return wrapper return decorator @retry(max_attempts=3, delay=2) def reliable_thread_task(data): """可靠的线程任务""" # 模拟可能失败的操作 if random.random() < 0.3: # 30%概率失败 raise ValueError("随机失败") time.sleep(0.5) return f"成功处理: {data}" def robust_process(): """健壮的进程函数""" try: threads = [] results = [] for i in range(5): t = threading.Thread( target=lambda: results.append(reliable_thread_task(f"data-{i}")) ) threads.append(t) t.start() for t in threads: t.join() print(f"处理结果: {results}") except Exception as e: print(f"进程失败: {e}") # 这里可以添加更复杂的错误处理逻辑 if __name__ == "__main__": processes = [] for i in range(2): p = multiprocessing.Process(target=robust_process) processes.append(p) p.start() for p in processes: p.join()
六、实战应用:Web服务请求处理
import multiprocessing import threading import time import random from http.server import HTTPServer, BaseHTTPRequestHandler import json class RequestHandler(BaseHTTPRequestHandler): def do_GET(self): # 模拟处理时间 processing_time = random.uniform(0.1, 1.0) time.sleep(processing_time) self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() response = { 'path': self.path, 'processing_time': processing_time, 'process': multiprocessing.current_process().name, 'thread': threading.current_thread().name } self.wfile.write(json.dumps(response).encode()) def run_server(port): """运行HTTP服务器""" server = HTTPServer(('localhost', port), RequestHandler) print(f"服务器在进程 {multiprocessing.current_process().name} 中启动,端口: {port}") server.serve_forever() def health_checker(server_ports): """健康检查线程""" while True: time.sleep(5) print(f"健康检查: 服务器进程正常运行,监控端口: {server_ports}") def server_process(port): """服务器进程函数""" # 创建服务器线程 server_thread = threading.Thread( target=run_server, args=(port,), daemon=True ) # 创建健康检查线程 health_thread = threading.Thread( target=health_checker, args=([port],), daemon=True ) server_thread.start() health_thread.start() # 等待服务器线程结束 server_thread.join() if __name__ == "__main__": # 启动多个服务器进程,每个进程在不同的端口上运行 ports = [8000, 8001, 8002] processes = [] for port in ports: p = multiprocessing.Process( target=server_process, args=(port,), name=f"ServerProcess-{port}" ) processes.append(p) p.start() print(f"启动了 {len(processes)} 个服务器进程") try: # 主进程保持运行 while True: time.sleep(1) except KeyboardInterrupt: print("正在关闭服务器...") for p in processes: p.terminate() for p in processes: p.join() print("所有服务器已关闭")
以上就是Python子进程中创建多线程的完整指南的详细内容,更多关于Python子进程创建多线程的资料请关注脚本之家其它相关文章!