Python子进程中创建多线程的完整指南
作者:Yant224
在操作系统中,进程是资源分配的基本单位,每个进程都有独立的内存空间、文件描述符等系统资源,而线程是进程内的执行单元,多个线程共享同一进程的资源,本文给大家介绍了Python子进程中创建多线程的完整指南,需要的朋友可以参考下
一、理解进程与线程的关系
1.1 进程与线程的基本概念
在操作系统中,进程是资源分配的基本单位,每个进程都有独立的内存空间、文件描述符等系统资源。而线程是进程内的执行单元,多个线程共享同一进程的资源。
Python中的特殊之处在于**全局解释器锁(GIL)**的存在,这使得在单个进程中,多线程无法真正并行执行CPU密集型任务。但在I/O密集型任务中,多线程仍然能显著提升性能。
1.2 子进程内多线程的架构模型

这种架构的优势在于:
- 充分利用多核CPU:每个子进程可以在不同的CPU核心上运行
- 资源共享与隔离平衡:线程共享进程资源,进程间资源隔离
- 灵活的任务分配:可以根据任务特性选择进程级或线程级并行
二、实现原理与技术细节
2.1 Python的多进程模块
Python提供了multiprocessing模块来创建和管理进程:
import multiprocessing
import os
def worker():
print(f"进程ID: {os.getpid()}, 进程名称: {multiprocessing.current_process().name}")
if __name__ == "__main__":
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, name=f"Process-{i}")
processes.append(p)
p.start()
for p in processes:
p.join()
2.2 进程间通信(IPC)机制
由于进程有独立的内存空间,必须使用特殊的通信机制:
| 通信方式 | 描述 | 适用场景 |
|---|---|---|
| Queue | 先进先出的队列 | 生产者-消费者模式 |
| Pipe | 双向通信通道 | 一对一通信 |
| Shared Memory | 共享内存区域 | 高性能数据共享 |
| Manager | 托管共享对象 | 复杂数据结构共享 |
三、三种实现方法详解
3.1 方法一:基础组合方式
import multiprocessing
import threading
import time
def thread_task(thread_id):
"""线程工作函数"""
print(f"线程 {thread_id} 在进程 {multiprocessing.current_process().name} 中运行")
time.sleep(2)
return f"线程 {thread_id} 完成"
def process_task():
"""进程工作函数"""
print(f"进程 {multiprocessing.current_process().name} 启动")
# 创建并启动多个线程
threads = []
results = []
for i in range(4):
t = threading.Thread(target=thread_task, args=(i,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print(f"进程 {multiprocessing.current_process().name} 结束")
if __name__ == "__main__":
# 创建3个子进程
processes = []
for i in range(3):
p = multiprocessing.Process(target=process_task, name=f"SubProcess-{i}")
processes.append(p)
p.start()
# 等待所有子进程完成
for p in processes:
p.join()
print("所有进程完成")
3.2 方法二:使用进程池和线程池
import concurrent.futures
import multiprocessing
import threading
import time
def thread_worker(data):
"""线程池工作函数"""
process_name = multiprocessing.current_process().name
thread_name = threading.current_thread().name
time.sleep(0.5) # 模拟工作负载
return f"{process_name}-{thread_name} 处理: {data}"
def process_worker(data_chunk):
"""进程池工作函数"""
print(f"进程 {multiprocessing.current_process().name} 开始处理 {len(data_chunk)} 个项目")
# 使用线程池处理数据
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(thread_worker, data_chunk))
return results
if __name__ == "__main__":
# 准备数据
all_data = [f"data_{i}" for i in range(20)]
chunk_size = 5
data_chunks = [all_data[i:i+chunk_size] for i in range(0, len(all_data), chunk_size)]
# 使用进程池
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(process_worker, chunk) for chunk in data_chunks]
# 收集结果
all_results = []
for future in concurrent.futures.as_completed(futures):
all_results.extend(future.result())
print("处理完成,结果:")
for result in all_results:
print(f" {result}")
3.3 方法三:自定义进程类
import multiprocessing
import threading
import time
class ThreadedProcess(multiprocessing.Process):
def __init__(self, task_id, data_list):
super().__init__()
self.task_id = task_id
self.data_list = data_list
self.results = multiprocessing.Manager().list()
def run(self):
print(f"进程 {self.name} 开始处理任务 {self.task_id}")
# 创建线程
threads = []
for i, data in enumerate(self.data_list):
thread = threading.Thread(
target=self.process_item,
args=(data, i)
)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print(f"进程 {self.name} 完成任务 {self.task_id}")
def process_item(self, data, index):
"""处理单个数据项"""
thread_name = threading.current_thread().name
print(f"线程 {thread_name} 处理: {data}")
time.sleep(0.3) # 模拟处理时间
# 处理数据并保存结果
result = {
'index': index,
'data': data,
'processed': data.upper(), # 示例处理
'thread': thread_name,
'process': self.name
}
self.results.append(result)
if __name__ == "__main__":
# 准备任务数据
tasks = [
(1, ['apple', 'banana', 'cherry']),
(2, ['dog', 'elephant', 'fox']),
(3, ['green', 'blue', 'red', 'yellow'])
]
# 创建并启动进程
processes = []
for task_id, data_list in tasks:
process = ThreadedProcess(task_id, data_list)
processes.append(process)
process.start()
# 等待所有进程完成
for process in processes:
process.join()
# 汇总结果
all_results = []
for process in processes:
all_results.extend(list(process.results))
print("\n所有任务完成,结果汇总:")
for result in all_results:
print(f" 任务{result['index']}: {result['data']} -> {result['processed']}")
四、进程间通信实战
4.1 使用Queue进行进程间通信
import multiprocessing
import threading
import time
import random
def producer(queue, producer_id):
"""生产者线程函数"""
for i in range(5):
item = f"生产者{producer_id}-项目{i}"
queue.put(item)
print(f"生产: {item}")
time.sleep(random.uniform(0.1, 0.5))
queue.put(f"生产者{producer_id}-完成")
def consumer_process(queue, consumer_id):
"""消费者进程函数"""
print(f"消费者进程 {consumer_id} 启动")
completed_producers = 0
total_producers = 2 # 假设有2个生产者
while completed_producers < total_producers:
try:
item = queue.get(timeout=5)
if item.endswith("-完成"):
completed_producers += 1
print(f"消费者{consumer_id} 收到完成信号: {item}")
else:
print(f"消费者{consumer_id} 处理: {item}")
time.sleep(random.uniform(0.2, 0.8)) # 模拟处理时间
except queue.Empty:
print(f"消费者{consumer_id} 等待超时")
break
print(f"消费者进程 {consumer_id} 结束")
def producer_process(queue, process_id):
"""生产者进程函数"""
print(f"生产者进程 {process_id} 启动")
# 在生产者进程中创建多个线程
producer_threads = []
for i in range(2): # 每个进程创建2个生产者线程
thread = threading.Thread(
target=producer,
args=(queue, f"P{process_id}-T{i}")
)
producer_threads.append(thread)
thread.start()
# 等待所有生产者线程完成
for thread in producer_threads:
thread.join()
print(f"生产者进程 {process_id} 结束")
if __name__ == "__main__":
# 创建进程间通信队列
queue = multiprocessing.Queue(maxsize=10)
# 创建生产者进程
producer_processes = []
for i in range(2):
p = multiprocessing.Process(
target=producer_process,
args=(queue, i)
)
producer_processes.append(p)
p.start()
# 创建消费者进程
consumer_processes = []
for i in range(2):
c = multiprocessing.Process(
target=consumer_process,
args=(queue, i)
)
consumer_processes.append(c)
c.start()
# 等待所有进程完成
for p in producer_processes:
p.join()
for c in consumer_processes:
c.join()
print("所有生产消费任务完成")
五、性能优化与最佳实践
5.1 资源管理策略
1.合理设置进程和线程数量:
import os
# 根据CPU核心数设置进程数
cpu_count = os.cpu_count()
process_pool_size = max(1, cpu_count - 1) # 留一个核心给系统
# 根据任务类型设置线程数
if task_type == "io_intensive":
thread_pool_size = 10 # I/O密集型可以更多线程
else:
thread_pool_size = cpu_count # CPU密集型不宜过多
2.使用连接池管理资源:
from multiprocessing import Pool
import threading
import database # 假设的数据库模块
# 进程级别的连接池
process_conn_pool = None
def init_process():
global process_conn_pool
process_conn_pool = database.ConnectionPool(max_connections=5)
def thread_task(query):
# 从进程级连接池获取连接
conn = process_conn_pool.get_connection()
try:
result = conn.execute(query)
return result
finally:
process_conn_pool.release_connection(conn)
def process_worker(queries):
with threading.ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(thread_task, queries))
return results
if __name__ == "__main__":
queries = [f"SELECT * FROM table WHERE id = {i}" for i in range(10)]
with Pool(processes=2, initializer=init_process) as pool:
results = pool.map(process_worker, [queries[:5], queries[5:]])
5.2 错误处理与重试机制
import multiprocessing
import threading
import time
from functools import wraps
def retry(max_attempts=3, delay=1):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
print(f"尝试 {attempts} 失败: {e}")
if attempts < max_attempts:
time.sleep(delay)
else:
raise
return wrapper
return decorator
@retry(max_attempts=3, delay=2)
def reliable_thread_task(data):
"""可靠的线程任务"""
# 模拟可能失败的操作
if random.random() < 0.3: # 30%概率失败
raise ValueError("随机失败")
time.sleep(0.5)
return f"成功处理: {data}"
def robust_process():
"""健壮的进程函数"""
try:
threads = []
results = []
for i in range(5):
t = threading.Thread(
target=lambda: results.append(reliable_thread_task(f"data-{i}"))
)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"处理结果: {results}")
except Exception as e:
print(f"进程失败: {e}")
# 这里可以添加更复杂的错误处理逻辑
if __name__ == "__main__":
processes = []
for i in range(2):
p = multiprocessing.Process(target=robust_process)
processes.append(p)
p.start()
for p in processes:
p.join()
六、实战应用:Web服务请求处理
import multiprocessing
import threading
import time
import random
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# 模拟处理时间
processing_time = random.uniform(0.1, 1.0)
time.sleep(processing_time)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'path': self.path,
'processing_time': processing_time,
'process': multiprocessing.current_process().name,
'thread': threading.current_thread().name
}
self.wfile.write(json.dumps(response).encode())
def run_server(port):
"""运行HTTP服务器"""
server = HTTPServer(('localhost', port), RequestHandler)
print(f"服务器在进程 {multiprocessing.current_process().name} 中启动,端口: {port}")
server.serve_forever()
def health_checker(server_ports):
"""健康检查线程"""
while True:
time.sleep(5)
print(f"健康检查: 服务器进程正常运行,监控端口: {server_ports}")
def server_process(port):
"""服务器进程函数"""
# 创建服务器线程
server_thread = threading.Thread(
target=run_server,
args=(port,),
daemon=True
)
# 创建健康检查线程
health_thread = threading.Thread(
target=health_checker,
args=([port],),
daemon=True
)
server_thread.start()
health_thread.start()
# 等待服务器线程结束
server_thread.join()
if __name__ == "__main__":
# 启动多个服务器进程,每个进程在不同的端口上运行
ports = [8000, 8001, 8002]
processes = []
for port in ports:
p = multiprocessing.Process(
target=server_process,
args=(port,),
name=f"ServerProcess-{port}"
)
processes.append(p)
p.start()
print(f"启动了 {len(processes)} 个服务器进程")
try:
# 主进程保持运行
while True:
time.sleep(1)
except KeyboardInterrupt:
print("正在关闭服务器...")
for p in processes:
p.terminate()
for p in processes:
p.join()
print("所有服务器已关闭")
以上就是Python子进程中创建多线程的完整指南的详细内容,更多关于Python子进程创建多线程的资料请关注脚本之家其它相关文章!
