python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python监控系统资源

Python实现监控系统资源的脚本工具

作者:零日失眠者

系统资源监控是系统管理员和开发人员日常工作中的重要任务,本文将介绍一个实用的Python系统资源监控工具,感兴趣的小伙伴可以了解一下

简介

系统资源监控是系统管理员和开发人员日常工作中的重要任务。通过实时监控CPU、内存、磁盘和网络等关键资源的使用情况,我们可以及时发现系统性能瓶颈、预防系统故障并优化资源配置。本文将介绍一个实用的Python脚本——系统资源监控工具,它可以实时显示系统的各项关键指标,并支持历史数据记录和告警功能。

功能介绍

这个系统资源监控工具具有以下核心功能:

应用场景

这个工具适用于以下场景:

报错处理

脚本包含了完善的错误处理机制:

代码实现

import psutil
import time
import csv
import argparse
import os
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from collections import deque
import threading
import sys

class SystemResourceMonitor:
    def __init__(self, interval=1, history_size=100):
        self.interval = interval
        self.history_size = history_size
        self.monitoring = False
        self.data_history = deque(maxlen=history_size)
        
        # 初始化历史数据存储
        self.timestamps = deque(maxlen=history_size)
        self.cpu_percentages = deque(maxlen=history_size)
        self.memory_percentages = deque(maxlen=history_size)
        self.disk_usages = deque(maxlen=history_size)
        self.network_bytes_sent = deque(maxlen=history_size)
        self.network_bytes_recv = deque(maxlen=history_size)
        
        # 网络流量基准值
        self.net_io_baseline = psutil.net_io_counters()
        
    def get_cpu_info(self):
        """获取CPU信息"""
        try:
            cpu_percent = psutil.cpu_percent(interval=1)
            cpu_count = psutil.cpu_count()
            cpu_freq = psutil.cpu_freq()
            
            return {
                'percent': cpu_percent,
                'count': cpu_count,
                'frequency': cpu_freq.current if cpu_freq else 0
            }
        except Exception as e:
            return {'error': str(e)}
            
    def get_memory_info(self):
        """获取内存信息"""
        try:
            memory = psutil.virtual_memory()
            swap = psutil.swap_memory()
            
            return {
                'total': memory.total,
                'available': memory.available,
                'used': memory.used,
                'percent': memory.percent,
                'swap_total': swap.total,
                'swap_used': swap.used,
                'swap_percent': swap.percent
            }
        except Exception as e:
            return {'error': str(e)}
            
    def get_disk_info(self):
        """获取磁盘信息"""
        try:
            disk = psutil.disk_usage('/')
            disk_io = psutil.disk_io_counters()
            
            return {
                'total': disk.total,
                'used': disk.used,
                'free': disk.free,
                'percent': disk.percent,
                'read_bytes': disk_io.read_bytes if disk_io else 0,
                'write_bytes': disk_io.write_bytes if disk_io else 0
            }
        except Exception as e:
            return {'error': str(e)}
            
    def get_network_info(self):
        """获取网络信息"""
        try:
            net_io = psutil.net_io_counters()
            
            # 计算相对于基准值的流量
            bytes_sent = net_io.bytes_sent - self.net_io_baseline.bytes_sent
            bytes_recv = net_io.bytes_recv - self.net_io_baseline.bytes_recv
            
            return {
                'bytes_sent': bytes_sent,
                'bytes_recv': bytes_recv,
                'packets_sent': net_io.packets_sent,
                'packets_recv': net_io.packets_recv
            }
        except Exception as e:
            return {'error': str(e)}
            
    def format_bytes(self, bytes_value):
        """格式化字节单位"""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if bytes_value < 1024.0:
                return f"{bytes_value:.2f} {unit}"
            bytes_value /= 1024.0
        return f"{bytes_value:.2f} PB"
        
    def display_system_info(self):
        """显示系统信息"""
        # 清屏(兼容不同操作系统)
        os.system('cls' if os.name == 'nt' else 'clear')
        
        print("=" * 80)
        print("系统资源监控工具")
        print("=" * 80)
        print(f"监控时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print()
        
        # CPU信息
        cpu_info = self.get_cpu_info()
        if 'error' not in cpu_info:
            print("CPU信息:")
            print(f"  使用率: {cpu_info['percent']:.1f}%")
            print(f"  核心数: {cpu_info['count']}")
            print(f"  频率: {cpu_info['frequency']:.2f} MHz")
        else:
            print(f"CPU信息获取失败: {cpu_info['error']}")
        print()
        
        # 内存信息
        memory_info = self.get_memory_info()
        if 'error' not in memory_info:
            print("内存信息:")
            print(f"  总计: {self.format_bytes(memory_info['total'])}")
            print(f"  已用: {self.format_bytes(memory_info['used'])}")
            print(f"  可用: {self.format_bytes(memory_info['available'])}")
            print(f"  使用率: {memory_info['percent']:.1f}%")
            if memory_info['swap_total'] > 0:
                print(f"  交换分区: {memory_info['swap_percent']:.1f}%")
        else:
            print(f"内存信息获取失败: {memory_info['error']}")
        print()
        
        # 磁盘信息
        disk_info = self.get_disk_info()
        if 'error' not in disk_info:
            print("磁盘信息:")
            print(f"  总计: {self.format_bytes(disk_info['total'])}")
            print(f"  已用: {self.format_bytes(disk_info['used'])}")
            print(f"  可用: {self.format_bytes(disk_info['free'])}")
            print(f"  使用率: {disk_info['percent']:.1f}%")
        else:
            print(f"磁盘信息获取失败: {disk_info['error']}")
        print()
        
        # 网络信息
        network_info = self.get_network_info()
        if 'error' not in network_info:
            print("网络信息:")
            print(f"  发送: {self.format_bytes(network_info['bytes_sent'])}")
            print(f"  接收: {self.format_bytes(network_info['bytes_recv'])}")
            print(f"  发送包: {network_info['packets_sent']:,}")
            print(f"  接收包: {network_info['packets_recv']:,}")
        else:
            print(f"网络信息获取失败: {network_info['error']}")
        print()
        
        print("=" * 80)
        print("按 Ctrl+C 停止监控")
        
    def collect_data(self):
        """收集监控数据"""
        timestamp = datetime.now()
        
        data = {
            'timestamp': timestamp,
            'cpu': self.get_cpu_info(),
            'memory': self.get_memory_info(),
            'disk': self.get_disk_info(),
            'network': self.get_network_info()
        }
        
        # 添加到历史记录
        self.data_history.append(data)
        
        # 更新图表数据
        self.timestamps.append(timestamp)
        if 'error' not in data['cpu']:
            self.cpu_percentages.append(data['cpu']['percent'])
        if 'error' not in data['memory']:
            self.memory_percentages.append(data['memory']['percent'])
        if 'error' not in data['disk']:
            self.disk_usages.append(data['disk']['percent'])
            
        return data
        
    def save_to_csv(self, filename="system_monitor.csv"):
        """保存数据到CSV文件"""
        try:
            file_exists = os.path.exists(filename)
            
            with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
                fieldnames = [
                    'timestamp', 'cpu_percent', 'memory_percent', 'disk_percent',
                    'network_sent', 'network_recv'
                ]
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                
                # 如果是新文件,写入表头
                if not file_exists:
                    writer.writeheader()
                    
                # 写入最新数据
                if self.data_history:
                    latest_data = self.data_history[-1]
                    row = {
                        'timestamp': latest_data['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),
                        'cpu_percent': latest_data['cpu'].get('percent', 0) if 'error' not in latest_data['cpu'] else 0,
                        'memory_percent': latest_data['memory'].get('percent', 0) if 'error' not in latest_data['memory'] else 0,
                        'disk_percent': latest_data['disk'].get('percent', 0) if 'error' not in latest_data['disk'] else 0,
                        'network_sent': latest_data['network'].get('bytes_sent', 0) if 'error' not in latest_data['network'] else 0,
                        'network_recv': latest_data['network'].get('bytes_recv', 0) if 'error' not in latest_data['network'] else 0
                    }
                    writer.writerow(row)
                    
            return True
        except Exception as e:
            print(f"保存CSV文件时出错: {e}")
            return False
            
    def generate_chart(self, filename="resource_usage.png"):
        """生成资源使用趋势图"""
        try:
            if len(self.timestamps) < 2:
                print("数据不足,无法生成图表")
                return False
                
            plt.figure(figsize=(12, 8))
            
            # 创建子图
            ax1 = plt.subplot(2, 2, 1)
            if self.cpu_percentages:
                ax1.plot(self.timestamps, self.cpu_percentages, 'b-', label='CPU使用率')
                ax1.set_title('CPU使用率')
                ax1.set_ylabel('百分比 (%)')
                ax1.legend()
                ax1.grid(True)
            
            ax2 = plt.subplot(2, 2, 2)
            if self.memory_percentages:
                ax2.plot(self.timestamps, self.memory_percentages, 'g-', label='内存使用率')
                ax2.set_title('内存使用率')
                ax2.set_ylabel('百分比 (%)')
                ax2.legend()
                ax2.grid(True)
            
            ax3 = plt.subplot(2, 2, 3)
            if self.disk_usages:
                ax3.plot(self.timestamps, self.disk_usages, 'r-', label='磁盘使用率')
                ax3.set_title('磁盘使用率')
                ax3.set_ylabel('百分比 (%)')
                ax3.set_xlabel('时间')
                ax3.legend()
                ax3.grid(True)
            
            ax4 = plt.subplot(2, 2, 4)
            if self.network_bytes_sent and self.network_bytes_recv:
                ax4.plot(self.timestamps, [b/1024/1024 for b in self.network_bytes_sent], 'c-', label='发送(MB)')
                ax4.plot(self.timestamps, [b/1024/1024 for b in self.network_bytes_recv], 'm-', label='接收(MB)')
                ax4.set_title('网络流量')
                ax4.set_ylabel('流量 (MB)')
                ax4.set_xlabel('时间')
                ax4.legend()
                ax4.grid(True)
            
            # 格式化时间轴
            for ax in [ax1, ax2, ax3, ax4]:
                ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
                ax.xaxis.set_major_locator(mdates.MinuteLocator(interval=5))
                plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
            
            plt.tight_layout()
            plt.savefig(filename, dpi=300, bbox_inches='tight')
            plt.close()
            
            print(f"图表已保存到: {filename}")
            return True
        except Exception as e:
            print(f"生成图表时出错: {e}")
            return False
            
    def check_thresholds(self, thresholds):
        """检查阈值并发出告警"""
        if not self.data_history:
            return
            
        latest_data = self.data_history[-1]
        
        # 检查CPU阈值
        if 'error' not in latest_data['cpu'] and thresholds.get('cpu'):
            cpu_percent = latest_data['cpu']['percent']
            if cpu_percent > thresholds['cpu']:
                print(f"⚠️  CPU使用率过高: {cpu_percent:.1f}% (阈值: {thresholds['cpu']}%)")
                
        # 检查内存阈值
        if 'error' not in latest_data['memory'] and thresholds.get('memory'):
            memory_percent = latest_data['memory']['percent']
            if memory_percent > thresholds['memory']:
                print(f"⚠️  内存使用率过高: {memory_percent:.1f}% (阈值: {thresholds['memory']}%)")
                
        # 检查磁盘阈值
        if 'error' not in latest_data['disk'] and thresholds.get('disk'):
            disk_percent = latest_data['disk']['percent']
            if disk_percent > thresholds['disk']:
                print(f"⚠️  磁盘使用率过高: {disk_percent:.1f}% (阈值: {thresholds['disk']}%)")
                
    def start_monitoring(self, duration=None, csv_file=None, chart_file=None, thresholds=None):
        """开始监控"""
        self.monitoring = True
        start_time = time.time()
        
        try:
            while self.monitoring:
                # 收集数据
                self.collect_data()
                
                # 显示信息
                self.display_system_info()
                
                # 保存到CSV
                if csv_file:
                    self.save_to_csv(csv_file)
                    
                # 检查阈值
                if thresholds:
                    self.check_thresholds(thresholds)
                    
                # 检查是否达到指定时长
                if duration and (time.time() - start_time) >= duration:
                    break
                    
                # 等待下次监控
                time.sleep(self.interval)
                
        except KeyboardInterrupt:
            print("\n\n停止监控...")
        finally:
            self.monitoring = False
            
            # 生成图表
            if chart_file:
                self.generate_chart(chart_file)
                
            print("监控已停止")

def main():
    parser = argparse.ArgumentParser(description="系统资源监控工具")
    parser.add_argument("-i", "--interval", type=int, default=5, 
                       help="监控间隔(秒,默认:5)")
    parser.add_argument("-d", "--duration", type=int, 
                       help="监控时长(秒,不指定则持续监控)")
    parser.add_argument("-c", "--csv", help="保存数据到CSV文件")
    parser.add_argument("-g", "--graph", help="生成图表文件")
    parser.add_argument("--cpu-threshold", type=float, 
                       help="CPU使用率告警阈值(%)")
    parser.add_argument("--memory-threshold", type=float, 
                       help="内存使用率告警阈值(%)")
    parser.add_argument("--disk-threshold", type=float, 
                       help="磁盘使用率告警阈值(%)")
    
    args = parser.parse_args()
    
    # 检查依赖
    try:
        import psutil
    except ImportError:
        print("错误: 缺少psutil库,请安装: pip install psutil")
        sys.exit(1)
        
    if args.graph and not plt:
        print("警告: 缺少matplotlib库,无法生成图表")
        args.graph = None
        
    # 设置阈值
    thresholds = {}
    if args.cpu_threshold:
        thresholds['cpu'] = args.cpu_threshold
    if args.memory_threshold:
        thresholds['memory'] = args.memory_threshold
    if args.disk_threshold:
        thresholds['disk'] = args.disk_threshold
        
    try:
        monitor = SystemResourceMonitor(interval=args.interval)
        monitor.start_monitoring(
            duration=args.duration,
            csv_file=args.csv,
            chart_file=args.graph,
            thresholds=thresholds
        )
        
    except Exception as e:
        print(f"程序执行出错: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

使用方法

安装依赖

在使用此脚本之前,需要安装必要的库:

pip install psutil matplotlib

基本使用

# 基本用法,每5秒刷新一次
python system_monitor.py

# 设置监控间隔为10秒
python system_monitor.py -i 10

# 监控60秒后自动停止
python system_monitor.py -d 60

# 保存监控数据到CSV文件
python system_monitor.py -c monitor_data.csv

# 生成资源使用趋势图
python system_monitor.py -g resource_trend.png

# 设置告警阈值
python system_monitor.py --cpu-threshold 80 --memory-threshold 85 --disk-threshold 90

命令行参数说明

使用示例

持续监控系统资源

python system_monitor.py

监控并保存数据

python system_monitor.py -i 10 -d 300 -c system_data.csv -g trend.png

带告警的监控

python system_monitor.py --cpu-threshold 80 --memory-threshold 85

总结

这个系统资源监控工具提供了一个全面的系统监控解决方案,能够实时显示CPU、内存、磁盘和网络等关键资源的使用情况。它支持数据持久化、可视化图表生成和阈值告警等功能,适用于服务器监控、性能调优和故障排查等多种场景。通过这个工具,用户可以更好地了解系统运行状态,及时发现和解决性能问题,确保系统的稳定运行。

以上就是Python实现监控系统资源的脚本工具的详细内容,更多关于Python监控系统资源的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文