一文带你搞懂Python中死锁问题的预防与处理
作者:铭渊老黄
一、那个让服务器沉默的夜晚
凌晨两点,告警短信把你从睡梦中惊醒:线上服务无响应,所有请求超时。你登上服务器,CPU 空转,内存正常,日志停在某个普通的时间戳——然后什么都没有了。
重启服务,恢复正常。第二天复盘,你发现是两个线程互相等待对方释放锁,谁都没有办法继续,系统就这样安静地"死"了。
这就是死锁(Deadlock)。它不像崩溃那样嘈杂,不像内存泄漏那样留下痕迹,它只是让你的程序永远等待下去,优雅而致命。
Python 并发编程的实战能力,很大程度上体现在你能否识别、预防和处理死锁。今天这篇文章,我们从理论到代码,彻底把死锁这个话题讲透。
二、死锁的四个必要条件
1965 年,计算机科学家 Coffman 等人总结出死锁发生的四个必要条件——缺少任何一个,死锁都不会发生。这四个条件是我们所有预防策略的理论基础。
条件一:互斥(Mutual Exclusion)
资源在某一时刻只能被一个线程持有。锁本身就是互斥的体现——这是锁存在的意义,通常无法消除。
条件二:持有并等待(Hold and Wait)
线程已经持有至少一个资源,同时在等待获取其他资源。这是死锁最典型的姿态。
条件三:不可剥夺(No Preemption)
线程持有的资源不能被强制剥夺,只能由持有者主动释放。操作系统层面的锁通常满足此条件。
条件四:循环等待(Circular Wait)
存在一个线程等待链:线程 A 等待线程 B 持有的资源,线程 B 等待线程 C 持有的资源……线程 N 等待线程 A 持有的资源,形成一个环。
让我们用代码重现经典的死锁场景:
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
print("线程1: 尝试获取 lock_a...")
with lock_a:
print("线程1: 已持有 lock_a,等待 lock_b...")
time.sleep(0.1) # 模拟处理时间,给线程2机会持有 lock_b
with lock_b: # ← 此处永远等待,lock_b 被线程2持有
print("线程1: 同时持有 lock_a 和 lock_b")
def thread_2():
print("线程2: 尝试获取 lock_b...")
with lock_b:
print("线程2: 已持有 lock_b,等待 lock_a...")
time.sleep(0.1)
with lock_a: # ← 此处永远等待,lock_a 被线程1持有
print("线程2: 同时持有 lock_a 和 lock_b")
t1 = threading.Thread(target=thread_1, name="Thread-1")
t2 = threading.Thread(target=thread_2, name="Thread-2")
t1.start()
t2.start()
t1.join() # 主线程永远阻塞在这里
t2.join()
print("这行代码永远不会被执行") # ← 程序已死锁
四个条件全部满足:锁是互斥的(条件1),每个线程持有一个锁并等待另一个(条件2),锁不会被强制释放(条件3),T1→T2→T1 形成循环等待(条件4)。
三、死锁的检测:发现已经发生的死锁
在工程实践中,有时死锁难以完全避免,这时就需要检测机制——定期扫描系统状态,发现死锁后采取恢复措施。
资源分配图检测算法
死锁检测的核心是构建资源分配图,然后查找图中的环。
from collections import defaultdict
from typing import Optional
class DeadlockDetector:
"""
基于资源分配图的死锁检测器。
维护两种关系:
- allocation: 线程当前持有的锁(已分配)
- waiting: 线程正在等待的锁
"""
def __init__(self):
# {thread_id: lock_id} 线程持有的锁
self.allocation: dict[str, str] = {}
# {thread_id: lock_id} 线程等待的锁
self.waiting: dict[str, str] = {}
def thread_acquired(self, thread_id: str, lock_id: str):
"""记录线程成功获取锁"""
self.allocation[thread_id] = lock_id
# 从等待列表移除
self.waiting.pop(thread_id, None)
def thread_waiting(self, thread_id: str, lock_id: str):
"""记录线程正在等待锁"""
self.waiting[thread_id] = lock_id
def thread_released(self, thread_id: str):
"""记录线程释放锁"""
self.allocation.pop(thread_id, None)
def _find_lock_holder(self, lock_id: str) -> Optional[str]:
"""找到持有某个锁的线程"""
for tid, lid in self.allocation.items():
if lid == lock_id:
return tid
return None
def detect_deadlock(self) -> Optional[list[str]]:
"""
检测是否存在死锁,返回死锁环中的线程列表。
算法:对每个等待中的线程,沿等待链追踪,
如果回到起点则发现死锁。
"""
for start_thread in self.waiting:
visited = []
current = start_thread
while current is not None:
if current in visited:
# 找到环!从环的起点截取
cycle_start = visited.index(current)
return visited[cycle_start:] + [current]
visited.append(current)
# 当前线程在等待哪个锁?
waiting_for_lock = self.waiting.get(current)
if not waiting_for_lock:
break # 不再等待,没有死锁(沿此路径)
# 谁持有那个锁?
current = self._find_lock_holder(waiting_for_lock)
return None # 未检测到死锁
def status_report(self) -> str:
lines = ["=== 资源分配状态 ==="]
lines.append("持有情况:")
for tid, lid in self.allocation.items():
waiting = self.waiting.get(tid, "无")
lines.append(f" {tid} 持有 {lid},等待 {waiting}")
deadlock = self.detect_deadlock()
if deadlock:
lines.append(f"\n⚠️ 检测到死锁!环路: {' → '.join(deadlock)}")
else:
lines.append("\n✅ 未检测到死锁")
return "\n".join(lines)
# 演示死锁检测
detector = DeadlockDetector()
# 模拟死锁场景
detector.thread_acquired("Thread-1", "Lock-A")
detector.thread_acquired("Thread-2", "Lock-B")
detector.thread_waiting("Thread-1", "Lock-B") # T1 等待被 T2 持有的 Lock-B
detector.thread_waiting("Thread-2", "Lock-A") # T2 等待被 T1 持有的 Lock-A
print(detector.status_report())
# 输出:
# === 资源分配状态 ===
# 持有情况:
# Thread-1 持有 Lock-A,等待 Lock-B
# Thread-2 持有 Lock-B,等待 Lock-A
# ⚠️ 检测到死锁!环路: Thread-1 → Thread-2 → Thread-1
四、死锁的预防:破坏四个条件之一
理论上,破坏四个必要条件中的任意一个,就能预防死锁。实践中,最常用的是以下几种策略。
策略一:固定加锁顺序(破坏循环等待)
这是最简单也最实用的预防策略。规定所有线程必须以相同的顺序获取锁,循环等待就不可能形成。
import threading
from contextlib import contextmanager
lock_a = threading.Lock()
lock_b = threading.Lock()
# 给每个锁分配唯一 ID,用于排序
LOCK_ORDER = {id(lock_a): 0, id(lock_b): 1}
@contextmanager
def acquire_locks_in_order(*locks):
"""
按固定顺序获取多个锁,彻底消除循环等待。
无论调用者以什么顺序传入锁,内部都会按 ID 排序。
"""
sorted_locks = sorted(locks, key=lambda l: id(l))
acquired = []
try:
for lock in sorted_locks:
lock.acquire()
acquired.append(lock)
yield
finally:
# 逆序释放
for lock in reversed(acquired):
lock.release()
def safe_thread_1():
with acquire_locks_in_order(lock_a, lock_b): # 传入顺序无关紧要
print("线程1: 安全地同时持有两个锁")
def safe_thread_2():
with acquire_locks_in_order(lock_b, lock_a): # 传入顺序不同,但内部会排序
print("线程2: 安全地同时持有两个锁")
t1 = threading.Thread(target=safe_thread_1)
t2 = threading.Thread(target=safe_thread_2)
t1.start(); t2.start()
t1.join(); t2.join()
print("程序正常结束,没有死锁!")
策略二:超时获取锁(破坏持有并等待)
使用 lock.acquire(timeout=N) 设置获取锁的超时时间,超时后放弃并重试,避免无限等待。
import threading
import time
import random
import logging
logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_with_timeout(first_lock, second_lock, name):
"""带超时的锁获取,失败后退避重试"""
max_retries = 5
for attempt in range(max_retries):
acquired_first = first_lock.acquire(timeout=0.5)
if not acquired_first:
logging.info(f"获取第一个锁超时,第 {attempt+1} 次重试")
time.sleep(random.uniform(0.01, 0.1)) # 随机退避,避免活锁
continue
try:
acquired_second = second_lock.acquire(timeout=0.5)
if not acquired_second:
logging.info(f"获取第二个锁超时,释放第一个锁,第 {attempt+1} 次重试")
first_lock.release()
time.sleep(random.uniform(0.01, 0.1))
continue
try:
# 成功同时持有两个锁
logging.info(f"✅ 成功获取两个锁,执行临界区操作")
time.sleep(0.2) # 模拟操作
return True
finally:
second_lock.release()
finally:
if first_lock.locked():
try:
first_lock.release()
except RuntimeError:
pass
logging.warning(f"⚠️ 达到最大重试次数,放弃执行")
return False
t1 = threading.Thread(target=thread_with_timeout, args=(lock_a, lock_b, "T1"), name="Thread-1")
t2 = threading.Thread(target=thread_with_timeout, args=(lock_b, lock_a, "T2"), name="Thread-2")
t1.start(); t2.start()
t1.join(); t2.join()
五、银行家算法:最优雅的死锁预防方案
以上策略都是"被动防御",而银行家算法(Banker’s Algorithm)是 Dijkstra 在 1965 年提出的主动安全分配策略——在分配资源之前,先判断分配后系统是否仍处于安全状态,如果不安全则拒绝分配。
核心概念
把操作系统比作一个银行,线程是客户,资源(锁、内存等)是贷款额度。银行规则:只有在保证所有客户最终都能得到满足的前提下,才批准贷款申请。
安全状态:存在一个执行顺序(安全序列),使得每个线程都能按需获得资源并最终完成,释放其持有的资源供后续线程使用。
from dataclasses import dataclass
from typing import Optional
import copy
@dataclass
class SystemState:
"""系统资源状态快照"""
n_threads: int # 线程数量
n_resources: int # 资源类型数量
available: list[int] # 当前可用资源向量
allocation: list[list[int]] # allocation[i][j]:线程i持有资源j的数量
max_need: list[list[int]] # max_need[i][j]:线程i最多需要资源j的数量
@property
def need(self) -> list[list[int]]:
"""计算每个线程还需要的资源量 = max_need - allocation"""
return [
[self.max_need[i][j] - self.allocation[i][j]
for j in range(self.n_resources)]
for i in range(self.n_threads)
]
class BankersAlgorithm:
"""
银行家算法实现。
在分配资源前,模拟分配后的状态,
若系统仍处于安全状态则批准,否则拒绝。
"""
def __init__(self, state: SystemState):
self.state = state
self.thread_names = [f"Thread-{i}" for i in range(state.n_threads)]
def find_safe_sequence(self, state: SystemState) -> Optional[list[int]]:
"""
安全性算法:寻找安全执行序列。
使用贪心策略:找到一个当前需求能被满足的线程,
模拟其执行完毕并释放资源,重复直到所有线程完成。
"""
available = state.available.copy()
need = state.need
finished = [False] * state.n_threads
safe_sequence = []
for _ in range(state.n_threads):
# 在未完成的线程中,找到需求能被当前可用资源满足的
found = False
for i in range(state.n_threads):
if finished[i]:
continue
# 检查线程 i 的所有资源需求是否都能被满足
if all(need[i][j] <= available[j] for j in range(state.n_resources)):
# 可以满足:模拟线程 i 执行完毕,释放其持有的资源
for j in range(state.n_resources):
available[j] += state.allocation[i][j]
finished[i] = True
safe_sequence.append(i)
found = True
break
if not found:
# 这轮没有任何线程能推进,系统处于不安全状态
return None
return safe_sequence # 返回安全序列
def request_resources(self, thread_id: int, request: list[int]) -> tuple[bool, str]:
"""
资源请求算法:线程申请资源。
返回 (是否批准, 原因说明)
"""
need = self.state.need
thread_name = self.thread_names[thread_id]
# 步骤1:检查请求是否超过声明的最大需求
if any(request[j] > need[thread_id][j] for j in range(self.state.n_resources)):
return False, f"❌ 拒绝:{thread_name} 的请求超过其声明的最大需求"
# 步骤2:检查请求是否超过当前可用资源
if any(request[j] > self.state.available[j] for j in range(self.state.n_resources)):
return False, f"⏳ 等待:{thread_name} 请求的资源暂不足,需要等待"
# 步骤3:试探性分配,检查分配后是否仍处于安全状态
# 创建试探状态(深拷贝)
trial_state = copy.deepcopy(self.state)
for j in range(self.state.n_resources):
trial_state.available[j] -= request[j]
trial_state.allocation[thread_id][j] += request[j]
trial_state.max_need[thread_id][j] -= request[j] # 更新剩余需求
safe_sequence = self.find_safe_sequence(trial_state)
if safe_sequence is None:
return False, (
f"🚫 拒绝:分配给 {thread_name} 后系统将进入不安全状态,"
f"存在死锁风险"
)
# 分配安全,提交试探状态
self.state = trial_state
seq_names = [self.thread_names[i] for i in safe_sequence]
return True, (
f"✅ 批准:分配后系统仍安全,"
f"安全执行序列: {' → '.join(seq_names)}"
)
def status_report(self):
"""打印系统当前状态"""
need = self.state.need
print("\n" + "="*60)
print("系统资源状态报告")
print("="*60)
print(f"可用资源: {self.state.available}")
print(f"\n{'线程':<12} {'已分配':<20} {'最大需求':<20} {'还需要':<20}")
print("-"*60)
for i in range(self.state.n_threads):
print(
f"{self.thread_names[i]:<12} "
f"{str(self.state.allocation[i]):<20} "
f"{str(self.state.max_need[i]):<20} "
f"{str(need[i]):<20}"
)
safe_seq = self.find_safe_sequence(self.state)
if safe_seq:
seq_names = [self.thread_names[i] for i in safe_seq]
print(f"\n安全状态: ✅ 安全序列: {' → '.join(seq_names)}")
else:
print("\n安全状态: ⚠️ 不安全!存在死锁风险")
print("="*60 + "\n")
完整演示
# 构建初始系统状态
# 3 个线程,4 种资源(A, B, C, D)
initial_state = SystemState(
n_threads=3,
n_resources=4,
available=[3, 2, 2, 1], # 当前可用资源
allocation=[ # 当前已分配
[0, 1, 0, 0], # Thread-0 已持有
[2, 0, 1, 1], # Thread-1 已持有
[0, 0, 1, 0], # Thread-2 已持有
],
max_need=[ # 最大需求声明
[3, 2, 1, 1], # Thread-0 最多需要
[4, 1, 2, 2], # Thread-1 最多需要
[1, 1, 2, 0], # Thread-2 最多需要
],
)
banker = BankersAlgorithm(initial_state)
banker.status_report()
# 场景1:Thread-0 请求 [1, 0, 1, 0]
print("Thread-0 请求资源 [1, 0, 1, 0]:")
approved, reason = banker.request_resources(0, [1, 0, 1, 0])
print(reason)
# 场景2:Thread-1 请求超出需求的资源
print("\nThread-1 请求资源 [3, 0, 0, 0](超出声明上限):")
approved, reason = banker.request_resources(1, [3, 0, 0, 0])
print(reason)
# 场景3:Thread-2 请求可能导致不安全状态的资源
print("\nThread-2 请求资源 [0, 1, 1, 0]:")
approved, reason = banker.request_resources(2, [0, 1, 1, 0])
print(reason)
banker.status_report()
典型输出:
Thread-0 请求资源 [1, 0, 1, 0]:
✅ 批准:分配后系统仍安全,安全执行序列: Thread-2 → Thread-0 → Thread-1
Thread-1 请求资源 [3, 0, 0, 0](超出声明上限):
❌ 拒绝:Thread-1 的请求超过其声明的最大需求
Thread-2 请求资源 [0, 1, 1, 0]:
🚫 拒绝:分配给 Thread-2 后系统将进入不安全状态,存在死锁风险
六、工程实践中的死锁防御工具箱
理论和算法之外,日常 Python 开发中有几个实用工具可以直接武装你的并发代码。
使用 threading.RLock 防止可重入死锁:
import threading
# 普通 Lock:同一线程两次获取会死锁
# lock = threading.Lock()
# RLock(可重入锁):同一线程可以多次获取,不会死锁
rlock = threading.RLock()
def recursive_function(n):
with rlock: # 同一线程第二次获取 RLock,安全!
if n > 0:
recursive_function(n - 1)
print(f"层级 {n} 执行完毕")
recursive_function(3) # 正常执行,不会死锁
使用 threading.Semaphore 限制并发度:
import threading
import time
# 信号量:限制同时访问某资源的线程数量
db_semaphore = threading.Semaphore(3) # 最多3个线程同时访问数据库
def query_database(thread_id):
with db_semaphore:
print(f"Thread-{thread_id}: 获得数据库连接")
time.sleep(1) # 模拟查询
print(f"Thread-{thread_id}: 释放数据库连接")
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(8)]
for t in threads: t.start()
for t in threads: t.join()
七、总结:死锁防御的层次体系
处理死锁是一个需要在多个层次同时发力的工程课题,可以用"预防优于检测,检测优于恢复"来概括我们的策略优先级。
预防层是最根本的,通过固定加锁顺序破坏循环等待,是代价最小、最可靠的手段。在设计阶段就约定好锁的获取顺序,后续所有代码遵守这个规范,绝大多数死锁问题都能在萌芽状态被消灭。
规避层是更智能的选择,银行家算法在每次资源分配时主动判断安全性,代价是需要维护全局资源状态,适合资源类型明确、线程数量可控的系统。
检测与恢复层是兜底机制,定期运行死锁检测算法,发现死锁后强制中止一个线程(选择代价最小的"牺牲者")并重试。
工具层则是日常编码的护城河:用 RLock 防止可重入死锁,用 acquire(timeout=N) 防止无限等待,用 Queue 代替手动锁管理,用 asyncio 的单线程协程模型从根本上消灭锁的需求。
并发编程是 Python 开发者的成人礼。每一次与死锁的正面交锋,都是对你系统设计能力的一次考验。掌握这些工具和思维框架,你就有了在黑暗中点灯的能力。
你遇到过最难排查的死锁是什么情况?用了什么手段最终定位到问题? 欢迎在评论区分享你的"侦探故事",这类经验对所有开发者都极有价值。
到此这篇关于一文带你搞懂Python中死锁问题的预防与处理的文章就介绍到这了,更多相关Python死锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
