基于Python实现Windows带宽监控工具
作者:盗理者
环境
Python 3.12.2
psutil 版本: 7.0.0
Flask 版本: 3.1.2
matplotlib 版本: 3.10.6
pip 安装指定版本
pip install psutil==7.0.0 flask==3.1.2 matplotlib==3.10.6
介绍
做一个适用于 windows 的带宽监控工具,这个工具融合了Python后端开发、多线程、系统信息采集、Web框架、前端可视化等多个技术领域。
源码可以看开源地址
会使用的库
库名 | 核心作用 | 代码中的应用场景 |
---|---|---|
threading | 实现多线程编程,让程序“同时做多个任务” | 创建TrafficMonitor监控线程,与FlaskWeb服务并行运行 |
time | 提供时间相关功能(延迟、时间戳等) | time.sleep(1)实现每秒采集一次流量数据 |
datetime | 处理日期和时间,提供更丰富的时间格式 | 记录流量数据对应的时间戳(如datetime.now()) |
collections.deque | 双端队列,支持固定长度(自动丢弃旧数据) | 存储最近5分钟的流量数据(maxlen=300,每秒1条共300秒) |
io | 内存中的输入输出流,模拟文件操作 | 存储Matplotlib生成的图表(避免写入本地文件) |
base64 | 将二进制数据编码为文本格式 | 把图表二进制数据转为base64,方便前端HTML显示 |
json | 处理JSON数据(Python字典与JSON字符串互转) | 生成JSON格式的流量报告,供前端下载 |
os | 与操作系统交互(本代码中未实际使用,预留扩展) | 可用于获取系统信息、管理文件路径等 |
多线程关键代码
class TrafficMonitor(threading.Thread): def __init__(self, interface_name=None): super().__init__() # 调用父类构造函数 self.daemon = True # 设置为守护线程 self.running = True # 控制线程循环的开关 def run(self): # 1. 自动选择网络接口(优先非回环接口,且有发送数据的接口) # 2. 初始化上次流量统计值(用于计算每秒增量) # 3. 循环采集数据: while self.running: current_stats = psutil.net_io_counters(pernic=True)[self.interface_name] incoming_rate = current_stats.bytes_recv - self.last_incoming # 每秒入站流量 outgoing_rate = current_stats.bytes_sent - self.last_outgoing # 每秒出站流量 # 更新全局变量(供Web端调用) traffic_data['incoming'].append(incoming_rate) time.sleep(1) # 每秒采集一次 def stop(self): self.running = False # 关闭循环,线程退出
系统流量采集:用 psutil 获取网络数据
概念:网络流量的“增量”与“总量”
总量(bytes_recv/bytes_sent):从系统启动到当前,网络接口接收/发送的总字节数(不会重置)。
增量(每秒流量):当前总量 - 上次总量,即每秒的实际流量(如“100KB/s”)。
代码中的流量采集逻辑
选择网络接口:若未指定接口(如TrafficMonitor()),代码会自动遍历所有接口,排除回环接口(lo,本地测试用),选择有数据发送的接口(bytes_sent > 0)。
初始化上次总量:self.last_incoming = interfaces[self.interface_name].bytes_recv,记录初始总量。
计算每秒增量:每次循环中,用当前总量减去上次总量,得到每秒流量(如incoming_rate),再更新上次总量。
存储数据:将每秒流量和对应的时间戳存入全局变量traffic_data(deque类型,自动保留最近300条)。
Flask Web框架:搭建后端服务
Flask是轻量级Web框架,这部分有不明白的可以参考:python flask编写一个简易的web端程序(附demo)
前端部分
交互逻辑(JavaScript)
前端的核心是“实时获取后端数据并更新页面”,主要通过以下函数实现:
1. fetch():异步请求后端数据
fetch(‘/traffic-data’)会向后端/traffic-data路由发送请求,获取JSON格式的流量数据,再用这些数据更新图表。
2. Chart.js:绘制实时图表
Chart.js是轻量级前端绘图库,代码中用它绘制入站/出站流量曲线:
- 初始化图表:指定canvas元素、图表类型(line折线图)、初始数据(空)、坐标轴配置。
- 更新图表:每次fetch到新数据后,修改chart.data.labels(时间戳)和chart.data.datasets(流量数据),再调用chart.update()刷新图表。
3. 定时更新:setInterval()
用setInterval实现周期性更新:
- setInterval(updateChart, 1000):每秒更新一次图表(与后端采集频率一致)。
- setInterval(updateTotalStats, 5000):每5秒更新一次总流量(总流量变化较慢,无需频繁更新)。
4. 辅助函数:格式转换
- formatBytes():将字节数(如102400)转为易读格式(如100 KB),支持B/KB/MB/GB/TB。
- formatBytesPerSec():在formatBytes()基础上添加/s,如100 KB/s。
完整代码
完整 index 代码
<!DOCTYPE html> <html> <head> <title>Windows带宽监控</title> <script src="https://cdn.jsdelivr.net/npm/chart.js"></script> <style> body { font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; } .container { max-width: 1200px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } .header { text-align: center; margin-bottom: 20px; } .stats-container { display: flex; justify-content: space-around; margin-bottom: 20px; } .stat-card { background: #f0f0f0; padding: 15px; border-radius: 5px; text-align: center; min-width: 200px; } .stat-value { font-size: 24px; font-weight: bold; } .chart-container { position: relative; height: 400px; margin-bottom: 20px; } .controls { margin-bottom: 20px; text-align: center; } button { background: #4CAF50; color: white; border: none; padding: 10px 15px; border-radius: 4px; cursor: pointer; } button:hover { background: #45a049; } </style> </head> <body> <div class="container"> <div class="header"> <h1>Windows带宽监控工具</h1> <p>实时监控网络接口流量</p> </div> <div class="stats-container"> <div class="stat-card"> <div class="stat-title">当前入站流量</div> <div class="stat-value" id="current-in">0 B/s</div> </div> <div class="stat-card"> <div class="stat-title">当前出站流量</div> <div class="stat-value" id="current-out">0 B/s</div> </div> <div class="stat-card"> <div class="stat-title">总入站流量</div> <div class="stat-value" id="total-in">0 MB</div> </div> <div class="stat-card"> <div class="stat-title">总出站流量</div> <div class="stat-value" id="total-out">0 MB</div> </div> </div> <div class="controls"> <button onclick="downloadReport()">下载报表</button> <button onclick="changeView('minute')">最近1分钟</button> <button onclick="changeView('hour')">最近1小时</button> <button onclick="changeView('day')">最近24小时</button> </div> <div class="chart-container"> <canvas id="trafficChart"></canvas> </div> <div id="report"></div> </div> <script> // 创建图表 const ctx = document.getElementById('trafficChart').getContext('2d'); const chart = new Chart(ctx, { type: 'line', data: { labels: [], datasets: [ { label: '入站流量 (KB/s)', data: [], borderColor: 'rgba(75, 192, 192, 1)', backgroundColor: 'rgba(75, 192, 192, 0.2)', fill: true, tension: 0.4 }, { label: '出站流量 (KB/s)', data: [], borderColor: 'rgba(255, 99, 132, 1)', backgroundColor: 'rgba(255, 99, 132, 0.2)', fill: true, tension: 0.4 } ] }, options: { responsive: true, maintainAspectRatio: false, scales: { y: { beginAtZero: true, title: { display: true, text: '流量 (KB/秒)' } }, x: { title: { display: true, text: '时间' } } }, plugins: { legend: { position: 'top', }, title: { display: true, text: '网络流量监控' } } } }); // 格式化字节大小为易读格式 function formatBytes(bytes, decimals = 2) { if (bytes === 0) return '0 B'; const k = 1024; const dm = decimals < 0 ? 0 : decimals; const sizes = ['B', 'KB', 'MB', 'GB', 'TB']; const i = Math.floor(Math.log(bytes) / Math.log(k)); return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i]; } // 格式化字节/秒为易读格式 function formatBytesPerSec(bytes, decimals = 2) { return formatBytes(bytes, decimals) + '/s'; } // 更新图表数据 function updateChart() { fetch('/traffic-data') .then(response => response.json()) .then(data => { // 转换数据为KB/s const incomingKB = data.incoming.map(value => (value / 1024).toFixed(2)); const outgoingKB = data.outgoing.map(value => (value / 1024).toFixed(2)); chart.data.labels = data.timestamps; chart.data.datasets[0].data = incomingKB; chart.data.datasets[1].data = outgoingKB; chart.update(); // 更新当前流量显示 if (incomingKB.length > 0) { const currentIn = incomingKB[incomingKB.length - 1]; document.getElementById('current-in').textContent = formatBytesPerSec(currentIn * 1024); } if (outgoingKB.length > 0) { const currentOut = outgoingKB[outgoingKB.length - 1]; document.getElementById('current-out').textContent = formatBytesPerSec(currentOut * 1024); } }); } // 更新总流量统计 function updateTotalStats() { fetch('/total-traffic') .then(response => response.json()) .then(data => { document.getElementById('total-in').textContent = formatBytes(data.total_incoming); document.getElementById('total-out').textContent = formatBytes(data.total_outgoing); }); } // 更新报告 function updateReport() { fetch('/report') .then(response => response.text()) .then(data => { document.getElementById('report').innerHTML = data; }); } // 下载报表 function downloadReport() { fetch('/download-report') .then(response => response.blob()) .then(blob => { const url = window.URL.createObjectURL(blob); const a = document.createElement('a'); a.style.display = 'none'; a.href = url; a.download = 'bandwidth_report.json'; document.body.appendChild(a); a.click(); window.URL.revokeObjectURL(url); }); } // 切换视图 function changeView(range) { fetch('/change-view?range=' + range) .then(response => response.json()) .then(data => { // 重新加载页面数据 updateChart(); updateReport(); }); } // 初始加载 updateChart(); updateTotalStats(); updateReport(); // 定时更新 setInterval(updateChart, 1000); setInterval(updateTotalStats, 5000); setInterval(updateReport, 10000); </script> </body> </html>
完整后端代码
import threading import time import psutil from datetime import datetime from collections import deque import matplotlib.pyplot as plt import matplotlib.dates as mdates from flask import Flask, render_template, jsonify, request import io import base64 import json import os app = Flask(__name__) # 全局变量存储流量数据 traffic_data = { 'incoming': deque(maxlen=300), # 存储最近5分钟的入站流量(每秒一个数据点) 'outgoing': deque(maxlen=300), # 存储最近5分钟的出站流量 'timestamps': deque(maxlen=300) # 存储对应的时间戳 } # 流量统计类 class TrafficMonitor(threading.Thread): def __init__(self, interface_name=None): super().__init__() self.daemon = True self.last_incoming = 0 self.last_outgoing = 0 self.interface_name = interface_name self.running = True def run(self): print("开始监控网络流量...") # 获取网络接口 interfaces = psutil.net_io_counters(pernic=True) # 如果没有指定接口,使用第一个活动接口 if not self.interface_name: for name in interfaces: if name != 'lo' and interfaces[name].bytes_sent > 0: self.interface_name = name break if not self.interface_name: print("未找到可用的网络接口") return print(f"监控接口: {self.interface_name}") # 初始化计数器 self.last_incoming = interfaces[self.interface_name].bytes_recv self.last_outgoing = interfaces[self.interface_name].bytes_sent # 开始监控循环 while self.running: try: # 获取当前流量统计 current_stats = psutil.net_io_counters(pernic=True)[self.interface_name] current_incoming = current_stats.bytes_recv current_outgoing = current_stats.bytes_sent # 计算每秒流量 incoming_rate = current_incoming - self.last_incoming outgoing_rate = current_outgoing - self.last_outgoing # 更新计数器 self.last_incoming = current_incoming self.last_outgoing = current_outgoing # 更新全局流量数据 now = datetime.now() traffic_data['timestamps'].append(now) traffic_data['incoming'].append(incoming_rate) traffic_data['outgoing'].append(outgoing_rate) # 每秒更新一次 time.sleep(1) except Exception as e: print(f"监控出错: {e}") time.sleep(5) def stop(self): self.running = False # 创建并启动流量监控线程 monitor = TrafficMonitor() # Flask路由 @app.route('/') def index(): return render_template('index.html') @app.route('/traffic-data') def get_traffic_data(): # 返回JSON格式的流量数据 data = { 'timestamps': [ts.strftime('%H:%M:%S') for ts in traffic_data['timestamps']], 'incoming': list(traffic_data['incoming']), 'outgoing': list(traffic_data['outgoing']) } return jsonify(data) @app.route('/total-traffic') def get_total_traffic(): # 获取总流量统计 interfaces = psutil.net_io_counters(pernic=True) interface_name = monitor.interface_name total_incoming = interfaces[interface_name].bytes_recv if interface_name in interfaces else 0 total_outgoing = interfaces[interface_name].bytes_sent if interface_name in interfaces else 0 return jsonify({ 'total_incoming': total_incoming, 'total_outgoing': total_outgoing }) @app.route('/traffic-plot') def get_traffic_plot(): # 生成流量图表并返回base64编码的图像 if not traffic_data['timestamps']: return "暂无数据" plt.figure(figsize=(10, 6)) # 转换数据为KB/s incoming_kb = [x / 1024 for x in traffic_data['incoming']] outgoing_kb = [x / 1024 for x in traffic_data['outgoing']] plt.plot(traffic_data['timestamps'], incoming_kb, label='入站流量 (KB/s)') plt.plot(traffic_data['timestamps'], outgoing_kb, label='出站流量 (KB/s)') # 格式化图表 plt.xlabel('时间') plt.ylabel('流量 (KB/秒)') plt.title('实时网络流量监控') plt.legend() plt.grid(True) plt.gcf().autofmt_xdate() plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) # 将图表转换为base64编码 img = io.BytesIO() plt.savefig(img, format='png') img.seek(0) plot_url = base64.b64encode(img.getvalue()).decode() plt.close() return f'<img src="data:image/png;base64,{plot_url}">' @app.route('/report') def generate_report(): # 生成带宽使用报告 if not traffic_data['timestamps']: return "<p>暂无数据可生成报告</p>" # 计算统计信息 total_in = sum(traffic_data['incoming']) total_out = sum(traffic_data['outgoing']) avg_in = total_in / len(traffic_data['incoming']) avg_out = total_out / len(traffic_data['outgoing']) max_in = max(traffic_data['incoming']) max_out = max(traffic_data['outgoing']) # 转换为更友好的单位 def format_bytes(bytes): for unit in ['B', 'KB', 'MB', 'GB']: if bytes < 1024.0: return f"{bytes:.2f} {unit}" bytes /= 1024.0 return f"{bytes:.2f} TB" def format_bps(bytes_per_sec): return format_bytes(bytes_per_sec) + "/s" report = f""" <h2>带宽使用报告</h2> <p>统计时间段: {traffic_data['timestamps'][0]} 到 {traffic_data['timestamps'][-1]}</p> <p>监控时长: {len(traffic_data['timestamps'])} 秒</p> <p>总入站流量: {format_bytes(total_in)}</p> <p>总出站流量: {format_bytes(total_out)}</p> <p>平均入站速率: {format_bps(avg_in)}</p> <p>平均出站速率: {format_bps(avg_out)}</p> <p>最大入站速率: {format_bps(max_in)}</p> <p>最大出站速率: {format_bps(max_out)}</p> """ return report @app.route('/download-report') def download_report(): # 生成并下载JSON格式的详细报告 if not traffic_data['timestamps']: return "暂无数据", 404 # 准备报告数据 report_data = { "generated_at": datetime.now().isoformat(), "time_period": { "start": traffic_data['timestamps'][0].isoformat() if traffic_data['timestamps'] else None, "end": traffic_data['timestamps'][-1].isoformat() if traffic_data['timestamps'] else None, "duration_seconds": len(traffic_data['timestamps']) }, "traffic_data": { "timestamps": [ts.isoformat() for ts in traffic_data['timestamps']], "incoming_bytes_per_sec": list(traffic_data['incoming']), "outgoing_bytes_per_sec": list(traffic_data['outgoing']) }, "statistics": { "total_incoming_bytes": sum(traffic_data['incoming']), "total_outgoing_bytes": sum(traffic_data['outgoing']), "avg_incoming_bytes_per_sec": sum(traffic_data['incoming']) / len(traffic_data['incoming']), "avg_outgoing_bytes_per_sec": sum(traffic_data['outgoing']) / len(traffic_data['outgoing']), "max_incoming_bytes_per_sec": max(traffic_data['incoming']) if traffic_data['incoming'] else 0, "max_outgoing_bytes_per_sec": max(traffic_data['outgoing']) if traffic_data['outgoing'] else 0 } } # 转换为JSON字符串 report_json = json.dumps(report_data, indent=2) # 创建响应 from flask import Response response = Response( report_json, mimetype="application/json", headers={"Content-Disposition": "attachment;filename=bandwidth_report.json"} ) return response @app.route('/change-view') def change_view(): # 改变数据视图范围 range = request.args.get('range', 'minute') # 根据范围调整数据保留数量 if range == 'minute': new_maxlen = 60 # 1分钟 elif range == 'hour': new_maxlen = 3600 # 1小时 elif range == 'day': new_maxlen = 86400 # 24小时 else: new_maxlen = 300 # 默认5分钟 # 创建新的deque并复制现有数据 def resize_deque(old_deque, new_maxlen): new_deque = deque(maxlen=new_maxlen) for item in old_deque: new_deque.append(item) return new_deque traffic_data['incoming'] = resize_deque(traffic_data['incoming'], new_maxlen) traffic_data['outgoing'] = resize_deque(traffic_data['outgoing'], new_maxlen) traffic_data['timestamps'] = resize_deque(traffic_data['timestamps'], new_maxlen) return jsonify({"status": "success", "new_maxlen": new_maxlen}) if __name__ == '__main__': # 启动流量监控线程 monitor.start() # 启动Flask应用 print("启动带宽监控Web界面...") print("请访问 http://localhost:5000") app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)
以上就是基于Python实现Windows带宽监控工具的详细内容,更多关于Python带宽监控的资料请关注脚本之家其它相关文章!