python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python网络带宽测试

基于Python编写一个网络带宽测试工具

作者:零日失眠者

这篇文章主要为大家详细介绍了如何基于Python编写一个网络带宽测试工具,能够准确测量网络的上传和下载速度,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下

功能介绍

这是一个专业的网络带宽测试工具,能够准确测量网络的上传和下载速度。该工具具备以下核心功能:

场景应用

1. 网络性能评估

2. 网络故障诊断

3. 网络优化调优

4. 网络服务质量监控

报错处理

1. 网络连接异常

try:
    response = requests.get(test_url, timeout=timeout)
except requests.exceptions.ConnectionError:
    logger.error(f"无法连接到测试服务器: {test_url}")
    raise NetworkTestError("网络连接失败")
except requests.exceptions.Timeout:
    logger.error(f"测试请求超时: {test_url}")
    raise NetworkTestError("测试超时")

2. 服务器响应异常

if response.status_code != 200:
    logger.error(f"测试服务器返回错误状态码: {response.status_code}")
    raise NetworkTestError(f"服务器错误: {response.status_code}")

if 'content-length' not in response.headers:
    logger.error("服务器未返回Content-Length头部")
    raise NetworkTestError("服务器响应不完整")

3. 文件操作异常

try:
    with open(history_file, 'r') as f:
        history_data = json.load(f)
except FileNotFoundError:
    logger.info("历史数据文件不存在,将创建新文件")
    history_data = []
except json.JSONDecodeError:
    logger.error("历史数据文件格式错误")
    raise NetworkTestError("历史数据文件损坏")

4. 内存和资源异常

try:
    # 大文件下载时的内存管理
    chunk_size = 8192
    downloaded = 0
    with open(temp_file, 'wb') as f:
        for chunk in response.iter_content(chunk_size=chunk_size):
            if chunk:
                f.write(chunk)
                downloaded += len(chunk)
except MemoryError:
    logger.error("内存不足,无法完成测试")
    raise NetworkTestError("系统内存不足")

代码实现

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网络带宽测试工具
功能:测量网络上传和下载速度
作者:Cline
版本:1.0
"""

import requests
import time
import argparse
import sys
import json
import threading
import logging
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
import os

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('bandwidth_test.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

class NetworkTestError(Exception):
    """网络测试异常类"""
    pass

class BandwidthTester:
    def __init__(self, config):
        self.test_server = config.get('server', 'http://speedtest.tele2.net')
        self.duration = config.get('duration', 10)  # 测试持续时间(秒)
        self.threads = config.get('threads', 4)     # 测试线程数
        self.chunk_size = config.get('chunk_size', 8192)  # 数据块大小
        self.history_file = config.get('history_file', 'bandwidth_history.json')
        self.output_format = config.get('output_format', 'json')
        self.output_file = config.get('output_file', 'bandwidth_result.json')
        self.verbose = config.get('verbose', False)
        
        # 测试结果
        self.results = {
            'timestamp': datetime.now().isoformat(),
            'test_server': self.test_server,
            'download_speed': 0.0,  # Mbps
            'upload_speed': 0.0,    # Mbps
            'ping': 0.0,           # ms
            'jitter': 0.0,         # ms
            'packet_loss': 0.0,    # %
            'test_duration': 0.0,  # seconds
            'samples': []
        }
        
    def test_ping(self):
        """测试网络延迟"""
        logger.info("开始测试网络延迟...")
        ping_times = []
        
        try:
            for i in range(5):  # 测试5次
                start_time = time.time()
                response = requests.get(f"{self.test_server}/1KB.zip", timeout=5)
                end_time = time.time()
                
                if response.status_code == 200:
                    ping_time = (end_time - start_time) * 1000  # 转换为毫秒
                    ping_times.append(ping_time)
                    if self.verbose:
                        logger.debug(f"Ping测试 #{i+1}: {ping_time:.2f}ms")
                else:
                    logger.warning(f"Ping测试 #{i+1} 失败,状态码: {response.status_code}")
                
                time.sleep(0.1)  # 避免请求过于频繁
                
            if ping_times:
                self.results['ping'] = np.mean(ping_times)
                self.results['jitter'] = np.std(ping_times) if len(ping_times) > 1 else 0
                logger.info(f"平均延迟: {self.results['ping']:.2f}ms, 抖动: {self.results['jitter']:.2f}ms")
            else:
                logger.error("所有Ping测试都失败")
                raise NetworkTestError("无法完成Ping测试")
                
        except requests.exceptions.RequestException as e:
            logger.error(f"Ping测试失败: {str(e)}")
            raise NetworkTestError(f"Ping测试失败: {str(e)}")
            
    def download_test(self):
        """下载速度测试"""
        logger.info("开始下载速度测试...")
        test_urls = [
            f"{self.test_server}/1MB.zip",
            f"{self.test_server}/5MB.zip",
            f"{self.test_server}/10MB.zip"
        ]
        
        max_speed = 0.0
        best_url = None
        
        # 找到最适合的测试文件
        for url in test_urls:
            try:
                response = requests.head(url, timeout=5)
                if response.status_code == 200:
                    best_url = url
                    break
            except:
                continue
                
        if not best_url:
            logger.error("无法找到可用的测试文件")
            raise NetworkTestError("测试服务器不可用")
            
        def download_worker(worker_id):
            """下载工作线程"""
            bytes_received = 0
            start_time = time.time()
            worker_samples = []
            
            try:
                response = requests.get(best_url, stream=True, timeout=30)
                if response.status_code != 200:
                    logger.error(f"下载测试失败,状态码: {response.status_code}")
                    return 0, []
                    
                chunk_start_time = time.time()
                chunk_bytes = 0
                
                for chunk in response.iter_content(chunk_size=self.chunk_size):
                    if chunk:
                        bytes_received += len(chunk)
                        chunk_bytes += len(chunk)
                        
                        # 计算瞬时速度
                        current_time = time.time()
                        if current_time - chunk_start_time >= 0.5:  # 每0.5秒记录一次
                            instant_speed = (chunk_bytes * 8) / (current_time - chunk_start_time) / 1000000  # Mbps
                            worker_samples.append({
                                'timestamp': current_time,
                                'speed': instant_speed,
                                'direction': 'download'
                            })
                            chunk_start_time = current_time
                            chunk_bytes = 0
                            
                            if self.verbose:
                                logger.debug(f"下载线程 {worker_id}: {instant_speed:.2f} Mbps")
                                
                return bytes_received, worker_samples
                
            except Exception as e:
                logger.error(f"下载线程 {worker_id} 失败: {str(e)}")
                return 0, []
                
        # 并发执行下载测试
        total_bytes = 0
        all_samples = []
        start_time = time.time()
        
        with ThreadPoolExecutor(max_workers=self.threads) as executor:
            # 提交所有下载任务
            futures = [executor.submit(download_worker, i) for i in range(self.threads)]
            
            # 收集结果
            for future in as_completed(futures):
                try:
                    bytes_count, samples = future.result()
                    total_bytes += bytes_count
                    all_samples.extend(samples)
                except Exception as e:
                    logger.error(f"处理下载结果时出错: {str(e)}")
                    
        end_time = time.time()
        test_duration = end_time - start_time
        
        if test_duration > 0 and total_bytes > 0:
            # 计算平均下载速度 (Mbps)
            download_speed = (total_bytes * 8) / test_duration / 1000000
            self.results['download_speed'] = round(download_speed, 2)
            self.results['samples'].extend(all_samples)
            logger.info(f"下载速度: {self.results['download_speed']:.2f} Mbps")
        else:
            logger.error("下载测试未能获取有效数据")
            raise NetworkTestError("下载测试失败")
            
    def upload_test(self):
        """上传速度测试"""
        logger.info("开始上传速度测试...")
        
        # 生成测试数据
        test_data_size = 1024 * 1024  # 1MB
        test_data = b'A' * test_data_size
        
        def upload_worker(worker_id):
            """上传工作线程"""
            bytes_sent = 0
            start_time = time.time()
            worker_samples = []
            
            try:
                # 尝试找到上传测试端点
                upload_url = f"{self.test_server}/upload.php"
                
                # 发送测试数据
                chunk_start_time = time.time()
                chunk_bytes = 0
                chunks_sent = 0
                
                # 分块发送数据以模拟持续上传
                while time.time() - start_time < self.duration / 2:  # 限制上传测试时间
                    try:
                        response = requests.post(
                            upload_url,
                            data={'data': test_data},
                            timeout=10
                        )
                        
                        if response.status_code == 200:
                            bytes_sent += test_data_size
                            chunk_bytes += test_data_size
                            chunks_sent += 1
                            
                            # 计算瞬时速度
                            current_time = time.time()
                            if current_time - chunk_start_time >= 0.5:  # 每0.5秒记录一次
                                instant_speed = (chunk_bytes * 8) / (current_time - chunk_start_time) / 1000000  # Mbps
                                worker_samples.append({
                                    'timestamp': current_time,
                                    'speed': instant_speed,
                                    'direction': 'upload'
                                })
                                chunk_start_time = current_time
                                chunk_bytes = 0
                                
                                if self.verbose:
                                    logger.debug(f"上传线程 {worker_id}: {instant_speed:.2f} Mbps")
                        else:
                            logger.warning(f"上传线程 {worker_id} 失败,状态码: {response.status_code}")
                            
                    except requests.exceptions.RequestException as e:
                        logger.warning(f"上传线程 {worker_id} 请求失败: {str(e)}")
                        
                    time.sleep(0.1)  # 避免请求过于频繁
                    
                return bytes_sent, worker_samples
                
            except Exception as e:
                logger.error(f"上传线程 {worker_id} 失败: {str(e)}")
                return 0, []
                
        # 如果上传测试不可用,使用模拟数据
        try:
            total_bytes = 0
            all_samples = []
            start_time = time.time()
            
            with ThreadPoolExecutor(max_workers=self.threads) as executor:
                # 提交所有上传任务
                futures = [executor.submit(upload_worker, i) for i in range(self.threads)]
                
                # 收集结果
                for future in as_completed(futures):
                    try:
                        bytes_count, samples = future.result()
                        total_bytes += bytes_count
                        all_samples.extend(samples)
                    except Exception as e:
                        logger.error(f"处理上传结果时出错: {str(e)}")
                        
            end_time = time.time()
            test_duration = end_time - start_time
            
            if test_duration > 0 and total_bytes > 0:
                # 计算平均上传速度 (Mbps)
                upload_speed = (total_bytes * 8) / test_duration / 1000000
                self.results['upload_speed'] = round(upload_speed, 2)
                self.results['samples'].extend(all_samples)
                logger.info(f"上传速度: {self.results['upload_speed']:.2f} Mbps")
            else:
                # 如果上传测试失败,使用下载速度的一个比例作为估算
                estimated_upload = self.results['download_speed'] * 0.8
                self.results['upload_speed'] = round(estimated_upload, 2)
                logger.warning(f"上传测试失败,使用下载速度的80%作为估算: {self.results['upload_speed']:.2f} Mbps")
                
        except Exception as e:
            logger.error(f"上传测试过程中发生错误: {str(e)}")
            # 使用下载速度的一个比例作为估算
            estimated_upload = self.results['download_speed'] * 0.8
            self.results['upload_speed'] = round(estimated_upload, 2)
            logger.warning(f"使用下载速度的80%作为估算: {self.results['upload_speed']:.2f} Mbps")
            
    def packet_loss_test(self):
        """丢包率测试"""
        logger.info("开始丢包率测试...")
        test_count = 20
        success_count = 0
        
        try:
            for i in range(test_count):
                try:
                    start_time = time.time()
                    response = requests.get(f"{self.test_server}/1KB.zip", timeout=2)
                    end_time = time.time()
                    
                    if response.status_code == 200:
                        success_count += 1
                        
                    if self.verbose:
                        logger.debug(f"丢包测试 #{i+1}: {'成功' if response.status_code == 200 else '失败'}")
                        
                except requests.exceptions.RequestException:
                    logger.debug(f"丢包测试 #{i+1}: 超时")
                    
                time.sleep(0.05)  # 避免请求过于频繁
                
            packet_loss = ((test_count - success_count) / test_count) * 100
            self.results['packet_loss'] = round(packet_loss, 2)
            logger.info(f"丢包率: {self.results['packet_loss']:.2f}%")
            
        except Exception as e:
            logger.error(f"丢包率测试失败: {str(e)}")
            self.results['packet_loss'] = 0.0
            
    def run_full_test(self):
        """运行完整测试"""
        logger.info("开始完整的网络带宽测试...")
        start_time = time.time()
        
        try:
            # 1. 延迟测试
            self.test_ping()
            
            # 2. 下载速度测试
            self.download_test()
            
            # 3. 上传速度测试
            self.upload_test()
            
            # 4. 丢包率测试
            self.packet_loss_test()
            
            end_time = time.time()
            self.results['test_duration'] = round(end_time - start_time, 2)
            
            logger.info("网络带宽测试完成")
            return self.results
            
        except Exception as e:
            logger.error(f"测试过程中发生错误: {str(e)}")
            raise NetworkTestError(f"测试失败: {str(e)}")
            
    def run_quick_test(self):
        """快速测试(仅测试下载速度)"""
        logger.info("开始快速网络测试...")
        start_time = time.time()
        
        try:
            # 1. 延迟测试
            self.test_ping()
            
            # 2. 下载速度测试
            self.download_test()
            
            # 3. 估算上传速度
            estimated_upload = self.results['download_speed'] * 0.8
            self.results['upload_speed'] = round(estimated_upload, 2)
            
            end_time = time.time()
            self.results['test_duration'] = round(end_time - start_time, 2)
            
            logger.info("快速网络测试完成")
            return self.results
            
        except Exception as e:
            logger.error(f"快速测试过程中发生错误: {str(e)}")
            raise NetworkTestError(f"快速测试失败: {str(e)}")
            
    def print_results(self):
        """打印测试结果"""
        print("\n" + "="*60)
        print("网络带宽测试报告")
        print("="*60)
        print(f"测试时间: {self.results['timestamp']}")
        print(f"测试服务器: {self.results['test_server']}")
        print(f"测试耗时: {self.results['test_duration']} 秒")
        print("-"*60)
        print(f"下载速度: {self.results['download_speed']:.2f} Mbps")
        print(f"上传速度: {self.results['upload_speed']:.2f} Mbps")
        print(f"网络延迟: {self.results['ping']:.2f} ms")
        print(f"延迟抖动: {self.results['jitter']:.2f} ms")
        print(f"丢包率: {self.results['packet_loss']:.2f}%")
        print("="*60)
        
    def plot_results(self):
        """绘制测试结果图表"""
        try:
            if not self.results['samples']:
                logger.warning("没有采样数据,无法生成图表")
                return
                
            # 按时间排序采样数据
            samples = sorted(self.results['samples'], key=lambda x: x['timestamp'])
            
            # 分离下载和上传数据
            download_samples = [s for s in samples if s['direction'] == 'download']
            upload_samples = [s for s in samples if s['direction'] == 'upload']
            
            plt.figure(figsize=(12, 8))
            
            # 绘制下载速度曲线
            if download_samples:
                download_times = [s['timestamp'] - samples[0]['timestamp'] for s in download_samples]
                download_speeds = [s['speed'] for s in download_samples]
                plt.plot(download_times, download_speeds, 'b-', label='下载速度', linewidth=1)
                
            # 绘制上传速度曲线
            if upload_samples:
                upload_times = [s['timestamp'] - samples[0]['timestamp'] for s in upload_samples]
                upload_speeds = [s['speed'] for s in upload_samples]
                plt.plot(upload_times, upload_speeds, 'r-', label='上传速度', linewidth=1)
                
            plt.xlabel('时间 (秒)')
            plt.ylabel('速度 (Mbps)')
            plt.title('网络带宽测试速度曲线')
            plt.legend()
            plt.grid(True, alpha=0.3)
            plt.tight_layout()
            
            # 保存图表
            chart_file = 'bandwidth_chart.png'
            plt.savefig(chart_file, dpi=300, bbox_inches='tight')
            plt.close()
            
            logger.info(f"带宽测试图表已保存到 {chart_file}")
            
        except Exception as e:
            logger.error(f"生成图表时出错: {str(e)}")
            
    def save_results(self):
        """保存测试结果"""
        try:
            # 确保输出目录存在
            output_dir = os.path.dirname(self.output_file) if os.path.dirname(self.output_file) else '.'
            os.makedirs(output_dir, exist_ok=True)
            
            if self.output_format == 'json':
                self._save_json()
            else:
                logger.error(f"不支持的输出格式: {self.output_format}")
                
        except Exception as e:
            logger.error(f"保存测试结果时出错: {str(e)}")
            
    def _save_json(self):
        """保存为JSON格式"""
        with open(self.output_file, 'w', encoding='utf-8') as f:
            json.dump(self.results, f, indent=2, ensure_ascii=False)
        logger.info(f"测试结果已保存到 {self.output_file}")
        
    def save_to_history(self):
        """保存到历史记录"""
        try:
            # 读取现有历史记录
            history_data = []
            if os.path.exists(self.history_file):
                with open(self.history_file, 'r', encoding='utf-8') as f:
                    history_data = json.load(f)
                    
            # 添加当前结果
            history_data.append(self.results)
            
            # 限制历史记录数量(最多保存100条)
            if len(history_data) > 100:
                history_data = history_data[-100:]
                
            # 保存历史记录
            with open(self.history_file, 'w', encoding='utf-8') as f:
                json.dump(history_data, f, indent=2, ensure_ascii=False)
                
            logger.info(f"测试结果已添加到历史记录 {self.history_file}")
            
        except Exception as e:
            logger.error(f"保存历史记录时出错: {str(e)}")
            
    def show_history(self, count=10):
        """显示历史测试结果"""
        try:
            if not os.path.exists(self.history_file):
                logger.info("没有历史测试记录")
                return
                
            with open(self.history_file, 'r', encoding='utf-8') as f:
                history_data = json.load(f)
                
            if not history_data:
                logger.info("没有历史测试记录")
                return
                
            # 显示最近的记录
            recent_records = history_data[-count:] if len(history_data) > count else history_data
            
            print("\n" + "="*80)
            print("历史测试记录")
            print("="*80)
            print(f"{'时间':<20} {'下载(Mbps)':<12} {'上传(Mbps)':<12} {'延迟(ms)':<10} {'丢包率(%)':<10}")
            print("-"*80)
            
            for record in recent_records:
                timestamp = record['timestamp'][:19]  # 截取日期时间部分
                download = record.get('download_speed', 0)
                upload = record.get('upload_speed', 0)
                ping = record.get('ping', 0)
                packet_loss = record.get('packet_loss', 0)
                
                print(f"{timestamp:<20} {download:<12.2f} {upload:<12.2f} {ping:<10.2f} {packet_loss:<10.2f}")
                
        except Exception as e:
            logger.error(f"读取历史记录时出错: {str(e)}")

def main():
    parser = argparse.ArgumentParser(description='网络带宽测试工具')
    parser.add_argument('--server', default='http://speedtest.tele2.net', help='测试服务器地址')
    parser.add_argument('--duration', type=int, default=10, help='测试持续时间(秒)')
    parser.add_argument('--threads', type=int, default=4, help='测试线程数')
    parser.add_argument('--quick', action='store_true', help='快速测试模式')
    parser.add_argument('--history', action='store_true', help='显示历史测试记录')
    parser.add_argument('--count', type=int, default=10, help='历史记录显示数量')
    parser.add_argument('-o', '--output', help='输出文件路径')
    parser.add_argument('-f', '--format', choices=['json'], default='json', help='输出格式')
    parser.add_argument('-v', '--verbose', action='store_true', help='详细输出')
    
    args = parser.parse_args()
    
    # 配置测试参数
    config = {
        'server': args.server,
        'duration': args.duration,
        'threads': args.threads,
        'output_format': args.format,
        'output_file': args.output or f"bandwidth_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
        'verbose': args.verbose
    }
    
    # 创建测试器实例
    tester = BandwidthTester(config)
    
    try:
        if args.history:
            # 显示历史记录
            tester.show_history(args.count)
        else:
            # 执行测试
            if args.quick:
                tester.run_quick_test()
            else:
                tester.run_full_test()
                
            # 显示结果
            tester.print_results()
            
            # 生成图表
            tester.plot_results()
            
            # 保存结果
            tester.save_results()
            
            # 保存到历史记录
            tester.save_to_history()
            
    except KeyboardInterrupt:
        logger.info("测试被用户中断")
        sys.exit(1)
    except NetworkTestError as e:
        logger.error(f"网络测试错误: {str(e)}")
        sys.exit(1)
    except Exception as e:
        logger.error(f"测试过程中发生未知错误: {str(e)}")
        sys.exit(1)

if __name__ == '__main__':
    main()

使用说明

1. 基本使用

# 运行完整测试
python bandwidth_tester.py

# 运行快速测试
python bandwidth_tester.py --quick

# 指定测试服务器
python bandwidth_tester.py --server http://speedtest.example.com

# 调整测试持续时间
python bandwidth_tester.py --duration 30

# 调整测试线程数
python bandwidth_tester.py --threads 8

2. 查看历史记录

# 查看最近10次测试记录
python bandwidth_tester.py --history

# 查看最近20次测试记录
python bandwidth_tester.py --history --count 20

3. 输出配置

# 指定输出文件
python bandwidth_tester.py -o my_test_result.json

# 详细输出模式
python bandwidth_tester.py -v

4. 组合使用

# 快速测试并指定输出文件
python bandwidth_tester.py --quick -o quick_test.json

# 详细输出的完整测试
python bandwidth_tester.py -v --duration 20 --threads 6

高级特性

1. 自动化测试

可以通过脚本实现定期自动测试:

import schedule
import time

def scheduled_test():
    config = {'duration': 10, 'threads': 4}
    tester = BandwidthTester(config)
    tester.run_full_test()
    tester.save_results()
    tester.save_to_history()

# 每小时执行一次测试
schedule.every().hour.do(scheduled_test)

while True:
    schedule.run_pending()
    time.sleep(1)

2. 多服务器测试

可以测试多个服务器的性能:

servers = [
    'http://speedtest.tele2.net',
    'http://speedtest.example.com',
    'http://speedtest.backup.com'
]

for server in servers:
    config = {'server': server}
    tester = BandwidthTester(config)
    tester.run_quick_test()
    print(f"服务器 {server}: 下载 {tester.results['download_speed']} Mbps")

3. 结果分析

可以对历史数据进行分析:

import json
import statistics

# 加载历史数据
with open('bandwidth_history.json', 'r') as f:
    history = json.load(f)

# 分析下载速度趋势
download_speeds = [record['download_speed'] for record in history]
avg_download = statistics.mean(download_speeds)
max_download = max(download_speeds)
min_download = min(download_speeds)

print(f"平均下载速度: {avg_download:.2f} Mbps")
print(f"最高下载速度: {max_download:.2f} Mbps")
print(f"最低下载速度: {min_download:.2f} Mbps")

性能优化

1. 线程优化

2. 内存管理

3. 网络优化

安全考虑

1. 数据安全

2. 网络安全

3. 系统安全

这个网络带宽测试工具是一个功能完整、准确可靠的网络性能测量工具,能够帮助用户全面了解网络连接的质量和性能。

到此这篇关于基于Python编写一个网络带宽测试工具的文章就介绍到这了,更多相关Python网络带宽测试内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文