Python实现智能磁盘监控并自动清理垃圾文件
作者:红魔Y
磁盘空间不足是IT运维最高频的问题之一,本文我们将为大家详细介绍如何通过 Python 实现磁盘空间监控,大文件扫描,重复文件检测和自动清理功能,希望对大家有所帮助
"C 盘满了"——这句话大概是 IT 运维中听到最多的求助了。与其每次手动清理,不如让 Python 自动搞定。
磁盘空间问题的真相
企业环境中磁盘空间不足的常见原因:
- 日志文件膨胀:IIS 日志、应用日志、数据库日志没人清理
- 临时文件堆积:Windows 临时目录、浏览器缓存、安装残留
- 重复文件:同一个文件被复制了 N 份到不同位置
- 大文件遗忘:测试时拷入的 ISO、数据库备份、录像文件
- 回收站和缩略图缓存:用户删了文件但没清回收站
手动排查?右键→属性→看看哪个盘满了→再一层层找……效率极低。
基础篇:磁盘空间全景扫描
import os
import shutil
from pathlib import Path
from collections import defaultdict
def get_disk_usage():
"""获取所有磁盘分区的使用情况"""
print("=" * 60)
print(f"{'盘符':<8}{'总容量':<12}{'已用':<12}{'可用':<12}{'使用率':<8}")
print("=" * 60)
results = []
for partition in os.popen("wmic logicaldisk get caption,size,freespace").readlines()[1:]:
parts = partition.strip().split()
if len(parts) < 3:
continue
drive = parts[0]
total = int(parts[1]) # 字节
free = int(parts[2])
used = total - free
usage_percent = (used / total * 100) if total > 0 else 0
# 格式化显示
total_gb = total / 1024**3
used_gb = used / 1024**3
free_gb = free / 1024**3
# 根据使用率选择显示标记
if usage_percent >= 90:
icon = "🔴"
elif usage_percent >= 80:
icon = "🟡"
else:
icon = "🟢"
print(f"{icon} {drive:<6}{total_gb:>8.1f} GB{used_gb:>8.1f} GB"
f"{free_gb:>8.1f} GB{usage_percent:>6.1f}%")
results.append({
"drive": drive,
"total_gb": total_gb,
"used_gb": used_gb,
"free_gb": free_gb,
"usage_percent": usage_percent,
})
return results
# 使用
# disks = get_disk_usage()
实战一:大文件扫描器
快速找出占用空间最多的文件:
def scan_large_files(
directory,
min_size_mb=100,
top_n=20,
exclude_dirs=None
):
"""
扫描大文件
directory: 扫描目录
min_size_mb: 最小文件大小(MB)
top_n: 返回前 N 个
exclude_dirs: 排除的目录列表
"""
if exclude_dirs is None:
exclude_dirs = [
r"C:\Windows", r"C:\Program Files",
r"C:\Program Files (x86)",
]
large_files = []
min_size_bytes = min_size_mb * 1024 * 1024
print(f"正在扫描 {directory}(最小 {min_size_mb} MB)...")
for root, dirs, files in os.walk(directory):
# 排除目录
dirs[:] = [
d for d in dirs
if not any(
os.path.normpath(os.path.join(root, d)).startswith(
os.path.normpath(excl)
)
for excl in exclude_dirs
)
]
for file in files:
try:
filepath = os.path.join(root, file)
size = os.path.getsize(filepath)
if size >= min_size_bytes:
mtime = os.path.getmtime(filepath)
large_files.append({
"path": filepath,
"size_mb": round(size / 1024**2, 1),
"size_bytes": size,
"modified": mtime,
})
except (PermissionError, OSError):
continue
# 按大小排序
large_files.sort(key=lambda x: x["size_bytes"], reverse=True)
print(f"\n发现 {len(large_files)} 个大文件,TOP {top_n}:\n")
for f in large_files[:top_n]:
from datetime import datetime
mod_time = datetime.fromtimestamp(f["modified"]).strftime("%Y-%m-%d")
print(f" {f['size_mb']:>8.1f} MB {mod_time} {f['path']}")
return large_files
# 使用:扫描 D 盘超过 500MB 的文件
# large_files = scan_large_files(r"D:\", min_size_mb=500, top_n=30)
实战二:目录大小分析
找出哪些目录占用了最多空间:
def get_directory_sizes(
root_dir,
max_depth=2,
top_n=15,
exclude_dirs=None
):
"""
分析目录大小
max_depth: 递归深度
"""
if exclude_dirs is None:
exclude_dirs = []
dir_sizes = defaultdict(int)
print(f"正在分析 {root_dir} 的目录大小...")
for root, dirs, files in os.walk(root_dir):
# 计算当前深度
rel_path = os.path.relpath(root, root_dir)
depth = rel_path.count(os.sep) + (1 if rel_path != "." else 0)
if depth > max_depth:
dirs.clear() # 不再深入
continue
# 排除目录
dirs[:] = [
d for d in dirs
if not any(os.path.join(root, d).startswith(excl) for excl in exclude_dirs)
]
# 统计当前目录下文件大小
current_size = 0
for file in files:
try:
current_size += os.path.getsize(os.path.join(root, file))
except (PermissionError, OSError):
pass
# 累加到各级父目录
if depth <= max_depth:
dir_sizes[root] += current_size
# 排序
sorted_dirs = sorted(dir_sizes.items(), key=lambda x: x[1], reverse=True)
print(f"\nTOP {top_n} 目录:\n")
total = sum(s for _, s in sorted_dirs)
for path, size in sorted_dirs[:top_n]:
size_gb = size / 1024**3
percent = (size / total * 100) if total > 0 else 0
# 进度条可视化
bar_len = 30
filled = int(bar_len * percent / 100)
bar = "█" * filled + "░" * (bar_len - filled)
print(f" {size_gb:>8.2f} GB {bar} {percent:>5.1f}%")
print(f" {path}")
return sorted_dirs
# 使用
# dirs = get_directory_sizes(r"C:\Users", max_depth=2, exclude_dirs=[r"C:\Users\Default"])
实战三:重复文件检测
清理重复文件可以释放大量空间:
import hashlib
def get_file_hash(filepath, block_size=65536):
"""计算文件的 MD5 哈希"""
hasher = hashlib.md5()
try:
with open(filepath, "rb") as f:
for block in iter(lambda: f.read(block_size), b""):
hasher.update(block)
return hasher.hexdigest()
except (PermissionError, OSError):
return None
def find_duplicate_files(directory, min_size_kb=100):
"""
找出重复文件(基于文件大小+MD5双重校验)
min_size_kb: 只检测大于此大小的文件
"""
# 第一步:按文件大小分组
print("第一步:按文件大小分组...")
size_groups = defaultdict(list)
for root, dirs, files in os.walk(directory):
for file in files:
try:
filepath = os.path.join(root, file)
size = os.path.getsize(filepath)
if size >= min_size_kb * 1024:
size_groups[size].append(filepath)
except (PermissionError, OSError):
continue
# 只保留有多个文件的组
potential_duplicates = {
size: files for size, files in size_groups.items()
if len(files) >= 2
}
print(f"发现 {len(potential_duplicates)} 组大小相同的文件")
# 第二步:对大小相同的文件计算 MD5
print("第二步:计算 MD5 确认重复...")
duplicates = {} # hash -> [filepath, ...]
total_to_check = sum(len(files) for files in potential_duplicates.values())
checked = 0
for size, files in potential_duplicates.items():
for filepath in files:
file_hash = get_file_hash(filepath)
checked += 1
if file_hash:
if file_hash not in duplicates:
duplicates[file_hash] = []
duplicates[file_hash].append(filepath)
# 只保留真正重复的
true_duplicates = {
h: files for h, files in duplicates.items()
if len(files) >= 2
}
# 统计可释放空间
total_wasted = 0
for h, files in true_duplicates.items():
# 保留一个,其余都是浪费
size = os.path.getsize(files[0])
total_wasted += size * (len(files) - 1)
print(f"\n发现 {len(true_duplicates)} 组重复文件")
print(f"可释放空间: {total_wasted / 1024**3:.2f} GB\n")
# 按浪费空间排序
dup_sorted = sorted(
true_duplicates.items(),
key=lambda x: os.path.getsize(x[1][0]) * (len(x[1]) - 1),
reverse=True
)
for h, files in dup_sorted[:10]:
size = os.path.getsize(files[0])
wasted = size * (len(files) - 1)
print(f" 重复 {len(files)} 份, 各 {size/1024**2:.1f} MB, "
f"浪费 {wasted/1024**2:.1f} MB")
for f in files:
print(f" {f}")
print()
return true_duplicates
def cleanup_duplicates(duplicate_map, dry_run=True):
"""
清理重复文件(保留每组中的第一个)
dry_run: True=只报告不删除
"""
total_freed = 0
for h, files in duplicate_map.items():
# 保留第一个,删除其余
keep = files[0]
remove = files[1:]
for filepath in remove:
if dry_run:
size = os.path.getsize(filepath)
total_freed += size
print(f" [模拟删除] {filepath} ({size/1024**2:.1f} MB)")
else:
try:
size = os.path.getsize(filepath)
os.remove(filepath)
total_freed += size
print(f" [已删除] {filepath} ({size/1024**2:.1f} MB)")
except Exception as e:
print(f" [失败] {filepath}: {e}")
action = "可释放" if dry_run else "已释放"
print(f"\n{action}空间: {total_freed / 1024**3:.2f} GB")
if dry_run:
print("\n⚠️ 这是模拟运行,没有实际删除文件。")
print("确认无误后,设置 dry_run=False 重新运行。")
return total_freed
# 使用
# dups = find_duplicate_files(r"D:\Data", min_size_kb=1024)
# cleanup_duplicates(dups, dry_run=True) # 先模拟
# cleanup_duplicates(dups, dry_run=False) # 确认后删除
实战四:智能清理策略
不同类型的垃圾文件需要不同的清理策略:
from datetime import datetime, timedelta
import re
class DiskCleaner:
"""智能磁盘清理器"""
# 常见可清理的目录和模式
CLEANUP_TARGETS = {
"Windows 临时文件": r"C:\Windows\Temp",
"用户临时文件": os.path.join(os.environ.get("TEMP", ""), ""),
"缩略图缓存": os.path.join(
os.environ.get("LOCALAPPDATA", ""), "Microsoft", "Windows", "Explorer"
),
"Windows 更新缓存": r"C:\Windows\SoftwareDistribution\Download",
"Prefetch 缓存": r"C:\Windows\Prefetch",
}
# 可清理的文件模式
CLEANUP_PATTERNS = {
"日志文件": [r"\.log$"],
"临时文件": [r"\.tmp$", r"\.temp$", r"~\$"],
"缓存文件": [r"\.cache$", r"\.bak$"],
"缩略图数据库": [r"thumbcache_.*\.db$"],
}
# 可清理的目录模式
CLEANUP_DIR_PATTERNS = {
"node_modules": r"node_modules$",
"__pycache__": r"__pycache__$",
".pytest_cache": r"\.pytest_cache$",
"pip cache": r"pip[\\/]cache$",
}
def __init__(self, dry_run=True):
self.dry_run = dry_run
self.report = defaultdict(lambda: {"count": 0, "size": 0})
def clean_temp_directories(self):
"""清理系统临时目录"""
print("\n=== 清理临时目录 ===")
for name, path in self.CLEANUP_TARGETS.items():
if not os.path.exists(path):
continue
total_size = 0
file_count = 0
for item in os.listdir(path):
item_path = os.path.join(path, item)
try:
if os.path.isfile(item_path):
size = os.path.getsize(item_path)
total_size += size
file_count += 1
if not self.dry_run:
os.remove(item_path)
elif os.path.isdir(item_path):
size = sum(
os.path.getsize(os.path.join(dp, f))
for dp, dn, fn in os.walk(item_path)
for f in fn
)
total_size += size
file_count += 1
if not self.dry_run:
shutil.rmtree(item_path)
except (PermissionError, OSError):
pass
self.report[name]["count"] = file_count
self.report[name]["size"] = total_size
action = "可清理" if self.dry_run else "已清理"
if file_count > 0:
print(f" {name}: {action} {file_count} 项 "
f"({total_size/1024/1024:.1f} MB)")
def clean_old_logs(self, days=30, log_dirs=None):
"""清理 N 天前的旧日志"""
print(f"\n=== 清理 {days} 天前的日志 ===")
if log_dirs is None:
log_dirs = [
r"C:\inetpub\logs",
r"C:\Windows\System32\winevt\Logs",
]
cutoff = time.time() - (days * 86400)
total_cleaned = 0
for log_dir in log_dirs:
if not os.path.exists(log_dir):
continue
for root, dirs, files in os.walk(log_dir):
for file in files:
if not file.endswith(".log"):
continue
filepath = os.path.join(root, file)
try:
mtime = os.path.getmtime(filepath)
if mtime < cutoff:
size = os.path.getsize(filepath)
total_cleaned += size
if self.dry_run:
from datetime import datetime as dt
mod_date = dt.fromtimestamp(mtime).strftime("%Y-%m-%d")
print(f" [模拟] {filepath} ({size/1024/1024:.1f} MB, {mod_date})")
else:
os.remove(filepath)
except (PermissionError, OSError):
pass
action = "可释放" if self.dry_run else "已释放"
print(f"\n日志清理: {action} {total_cleaned/1024/1024:.1f} MB")
def clean_recycle_bin(self):
"""清空回收站"""
import ctypes
print("\n=== 清空回收站 ===")
if self.dry_run:
# 估算回收站大小
print(" [模拟] 清空回收站")
return
try:
# Windows API 清空回收站
result = ctypes.windll.shell32.SHEmptyRecycleBinW(None, None, 7)
if result == 0:
print(" 回收站已清空")
else:
print(f" 清空回收站返回: {result}")
except Exception as e:
print(f" 清空回收站失败: {e}")
def run_full_cleanup(self):
"""执行完整清理"""
mode = "模拟" if self.dry_run else "实际"
print(f"{'=' * 60}")
print(f" 磁盘清理 ({mode}模式)")
print(f"{'=' * 60}")
self.clean_temp_directories()
self.clean_old_logs(days=30)
self.clean_recycle_bin()
# 汇总
total_size = sum(item["size"] for item in self.report.values())
print(f"\n{'=' * 60}")
action = "总计可释放" if self.dry_run else "总计已释放"
print(f" {action}: {total_size / 1024 / 1024:.1f} MB")
print(f"{'=' * 60}")
if self.dry_run:
print("\n确认无误后,使用 DiskCleaner(dry_run=False) 执行实际清理")
return total_size
import time
# 使用示例
# cleaner = DiskCleaner(dry_run=True) # 先模拟
# cleaner.run_full_cleanup()
# cleaner = DiskCleaner(dry_run=False) # 确认后执行
# cleaner.run_full_cleanup()
实战五:磁盘空间监控与告警
import json
from datetime import datetime, timedelta
from pathlib import Path
class DiskMonitor:
"""磁盘空间持续监控器"""
def __init__(self, data_file="disk_history.json"):
self.data_file = Path(data_file)
self.history = self._load_history()
self.threshold_warning = 80 # 警告阈值 %
self.threshold_critical = 95 # 严重阈值 %
def _load_history(self):
"""加载历史数据"""
if self.data_file.exists():
with open(self.data_file, "r") as f:
return json.load(f)
return []
def _save_history(self):
"""保存历史数据"""
with open(self.data_file, "w") as f:
json.dump(self.history, f, ensure_ascii=False, indent=2)
def record_snapshot(self):
"""记录当前磁盘快照"""
snapshot = {
"timestamp": datetime.now().isoformat(),
"drives": [],
}
for partition in os.popen(
"wmic logicaldisk get caption,size,freespace"
).readlines()[1:]:
parts = partition.strip().split()
if len(parts) < 3:
continue
total = int(parts[1])
free = int(parts[2])
usage = ((total - free) / total * 100) if total > 0 else 0
snapshot["drives"].append({
"drive": parts[0],
"total_gb": round(total / 1024**3, 2),
"free_gb": round(free / 1024**3, 2),
"usage_percent": round(usage, 1),
})
self.history.append(snapshot)
# 只保留最近 90 天的数据
cutoff = (datetime.now() - timedelta(days=90)).isoformat()
self.history = [
h for h in self.history
if h["timestamp"] >= cutoff
]
self._save_history()
return snapshot
def check_alerts(self, snapshot=None):
"""检查是否需要告警"""
if not snapshot:
snapshot = self.record_snapshot()
alerts = []
for drive in snapshot["drives"]:
usage = drive["usage_percent"]
free_gb = drive["free_gb"]
if usage >= self.threshold_critical:
alerts.append({
"severity": "CRITICAL",
"drive": drive["drive"],
"message": (
f"{drive['drive']} 磁盘空间严重不足!"
f"已用 {usage}%,仅剩 {free_gb:.1f} GB"
),
})
elif usage >= self.threshold_warning:
alerts.append({
"severity": "WARNING",
"drive": drive["drive"],
"message": (
f"{drive['drive']} 磁盘空间不足警告:"
f"已用 {usage}%,剩余 {free_gb:.1f} GB"
),
})
return alerts
def analyze_trend(self, drive_letter, days=30):
"""分析磁盘使用趋势"""
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
recent = [
h for h in self.history
if h["timestamp"] >= cutoff
]
# 提取目标盘符的数据点
data_points = []
for h in recent:
for d in h["drives"]:
if d["drive"] == drive_letter:
data_points.append({
"time": h["timestamp"],
"usage": d["usage_percent"],
"free_gb": d["free_gb"],
})
break
if len(data_points) < 2:
print(f"数据不足(需要至少 2 个数据点,当前 {len(data_points)} 个)")
return None
# 计算趋势
first = data_points[0]
last = data_points[-1]
usage_change = last["usage"] - first["usage"]
free_change = last["free_gb"] - first["free_gb"]
days_span = len(data_points) # 近似天数
daily_usage_increase = usage_change / days_span if days_span > 0 else 0
daily_free_decrease = abs(free_change) / days_span if days_span > 0 else 0
# 预计何时达到阈值
if daily_usage_increase > 0:
days_to_warning = (self.threshold_warning - last["usage"]) / daily_usage_increase
days_to_critical = (self.threshold_critical - last["usage"]) / daily_usage_increase
else:
days_to_warning = float('inf')
days_to_critical = float('inf')
report = {
"drive": drive_letter,
"period": f"最近 {days} 天",
"data_points": len(data_points),
"current_usage": f"{last['usage']}%",
"current_free": f"{last['free_gb']} GB",
"usage_change": f"+{usage_change:.1f}%" if usage_change > 0 else f"{usage_change:.1f}%",
"free_change": f"{free_change:.1f} GB",
"daily_increase": f"+{daily_usage_increase:.2f}%" if daily_usage_increase > 0 else "稳定",
"days_to_warning": f"{days_to_warning:.0f} 天" if days_to_warning < 365 else "无风险",
"days_to_critical": f"{days_to_critical:.0f} 天" if days_to_critical < 365 else "无风险",
}
print(f"\n=== {drive_letter} 磁盘使用趋势 ({report['period']}) ===")
print(f" 当前使用率: {report['current_usage']}, 剩余: {report['current_free']}")
print(f" 变化: 使用率 {report['usage_change']}, 空间 {report['free_change']}")
print(f" 日均增长: {report['daily_increase']}")
print(f" 预计触发警告: {report['days_to_warning']}")
print(f" 预计磁盘满: {report['days_to_critical']}")
return report
# 使用示例
# monitor = DiskMonitor()
# monitor.record_snapshot() # 记录快照
# alerts = monitor.check_alerts()
# monitor.analyze_trend("C:", days=30)
小结
| 需求 | 方案 | 复杂度 |
|---|---|---|
| 磁盘全景 | wmic + shutil | 入门 |
| 大文件扫描 | os.walk + 大小排序 | 入门 |
| 目录大小分析 | os.walk + 深度控制 | 中等 |
| 重复文件检测 | 文件大小 + MD5 双重校验 | 中等 |
| 自动清理 | 按类型策略清理 | 中等 |
| 趋势分析 | 历史数据 + 线性回归 | 进阶 |
| 告警通知 | 阈值检查 + 邮件/微信推送 | 进阶 |
磁盘空间管理是运维的基本功。建好监控,设好告警,自动清理,把"C 盘满了"这种低级问题消灭在萌芽状态。
以上就是Python实现智能磁盘监控并自动清理垃圾文件的详细内容,更多关于Python磁盘监控的资料请关注脚本之家其它相关文章!
