python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python网络连通性检查

Python开发一个实用的网络连通性检查工具

作者:零日失眠者

网络连通性是现代计算机系统正常运行的基础,本文将介绍一个实用的Python网络连通性检查工具,可以全面检查网络连接状态,帮助用户快速诊断网络问题,有需要的小伙伴可以了解下

简介

网络连通性是现代计算机系统正常运行的基础。无论是访问互联网、连接内部网络还是与其他设备通信,网络连接的稳定性都至关重要。当网络出现问题时,快速诊断和定位故障点是解决问题的关键。本文将介绍一个实用的Python脚本——网络连通性检查工具,它可以全面检查网络连接状态,帮助用户快速诊断网络问题。

功能介绍

这个网络连通性检查工具具有以下核心功能:

应用场景

这个工具适用于以下场景:

报错处理

脚本包含了完善的错误处理机制:

代码实现

import os
import sys
import subprocess
import socket
import argparse
import json
import time
from datetime import datetime
import threading
import requests
from urllib.parse import urlparse

class NetworkConnectivityChecker:
    def __init__(self):
        self.results = []
        self.errors = []
        self.monitoring = False
        
    def ping_host(self, host, count=4, timeout=3):
        """Ping测试"""
        try:
            # 根据操作系统选择ping命令
            if os.name == 'nt':  # Windows
                cmd = ['ping', '-n', str(count), '-w', str(timeout * 1000), host]
            else:  # Linux/macOS
                cmd = ['ping', '-c', str(count), '-W', str(timeout), host]
                
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout * count + 5)
            
            if result.returncode == 0:
                # 解析ping输出获取统计信息
                output = result.stdout
                packet_loss = 0
                avg_rtt = 0
                
                if os.name == 'nt':  # Windows
                    # Windows ping输出解析
                    for line in output.split('\n'):
                        if 'Lost' in line:
                            # 匹配 "Lost = X (X% loss)"
                            import re
                            match = re.search(r'\((\d+)% loss\)', line)
                            if match:
                                packet_loss = int(match.group(1))
                        elif 'Average' in line:
                            # 匹配 "Average = Xms"
                            import re
                            match = re.search(r'Average = (\d+)ms', line)
                            if match:
                                avg_rtt = int(match.group(1))
                else:  # Linux/macOS
                    # Unix-like系统ping输出解析
                    for line in output.split('\n'):
                        if 'packet loss' in line:
                            # 匹配 "X packets transmitted, Y received, Z% packet loss"
                            import re
                            match = re.search(r'(\d+)% packet loss', line)
                            if match:
                                packet_loss = int(match.group(1))
                        elif 'rtt' in line:
                            # 匹配 "rtt min/avg/max/mdev = X/Y/Z/W ms"
                            import re
                            match = re.search(r'rtt [^=]+= [^/]+/([^/]+)/', line)
                            if match:
                                avg_rtt = float(match.group(1))
                                
                return {
                    'host': host,
                    'status': 'reachable',
                    'packet_loss': packet_loss,
                    'avg_rtt': avg_rtt,
                    'output': output
                }
            else:
                return {
                    'host': host,
                    'status': 'unreachable',
                    'error': result.stderr or 'Host unreachable',
                    'output': result.stdout
                }
                
        except subprocess.TimeoutExpired:
            return {
                'host': host,
                'status': 'timeout',
                'error': f'Ping timeout after {timeout * count + 5} seconds'
            }
        except Exception as e:
            return {
                'host': host,
                'status': 'error',
                'error': str(e)
            }
            
    def check_port(self, host, port, timeout=3):
        """端口检查"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            
            result = sock.connect_ex((host, port))
            sock.close()
            
            if result == 0:
                return {
                    'host': host,
                    'port': port,
                    'status': 'open'
                }
            else:
                return {
                    'host': host,
                    'port': port,
                    'status': 'closed',
                    'error': f'Connection refused (code: {result})'
                }
                
        except socket.gaierror as e:
            return {
                'host': host,
                'port': port,
                'status': 'dns_error',
                'error': f'DNS resolution failed: {e}'
            }
        except Exception as e:
            return {
                'host': host,
                'port': port,
                'status': 'error',
                'error': str(e)
            }
            
    def resolve_dns(self, hostname):
        """DNS解析测试"""
        try:
            ip_addresses = socket.getaddrinfo(hostname, None)
            ips = []
            for addr_info in ip_addresses:
                ip = addr_info[4][0]
                if ip not in ips:
                    ips.append(ip)
                    
            return {
                'hostname': hostname,
                'status': 'resolved',
                'ip_addresses': ips
            }
        except socket.gaierror as e:
            return {
                'hostname': hostname,
                'status': 'unresolved',
                'error': str(e)
            }
        except Exception as e:
            return {
                'hostname': hostname,
                'status': 'error',
                'error': str(e)
            }
            
    def http_check(self, url, timeout=10):
        """HTTP连通性测试"""
        try:
            parsed_url = urlparse(url)
            if not parsed_url.scheme:
                url = 'http://' + url
                
            response = requests.get(url, timeout=timeout)
            
            return {
                'url': url,
                'status': 'accessible',
                'status_code': response.status_code,
                'response_time': response.elapsed.total_seconds(),
                'content_length': len(response.content)
            }
        except requests.exceptions.Timeout:
            return {
                'url': url,
                'status': 'timeout',
                'error': f'Request timeout after {timeout} seconds'
            }
        except requests.exceptions.ConnectionError as e:
            return {
                'url': url,
                'status': 'connection_error',
                'error': str(e)
            }
        except requests.exceptions.RequestException as e:
            return {
                'url': url,
                'status': 'error',
                'error': str(e)
            }
        except Exception as e:
            return {
                'url': url,
                'status': 'error',
                'error': str(e)
            }
            
    def check_target(self, target):
        """检查单个目标"""
        result = {
            'target': target,
            'timestamp': datetime.now().isoformat(),
            'tests': {}
        }
        
        # 判断目标类型
        if target.startswith('http://') or target.startswith('https://'):
            # HTTP测试
            result['tests']['http'] = self.http_check(target)
        elif ':' in target and not target.replace(':', '').replace('.', '').isdigit():
            # 主机名:端口格式
            host, port = target.rsplit(':', 1)
            if port.isdigit():
                result['tests']['ping'] = self.ping_host(host)
                result['tests']['port'] = self.check_port(host, int(port))
                result['tests']['dns'] = self.resolve_dns(host)
            else:
                result['tests']['ping'] = self.ping_host(target)
                result['tests']['dns'] = self.resolve_dns(target)
        elif target.replace('.', '').isdigit() or not target.replace('-', '').replace('.', '').isdigit():
            # IP地址或主机名
            result['tests']['ping'] = self.ping_host(target)
            result['tests']['dns'] = self.resolve_dns(target)
        else:
            # 默认进行ping测试
            result['tests']['ping'] = self.ping_host(target)
            
        return result
        
    def check_multiple_targets(self, targets):
        """检查多个目标"""
        print(f"开始检查 {len(targets)} 个目标")
        print("=" * 60)
        
        self.results = []
        
        for target in targets:
            print(f"检查目标: {target}")
            result = self.check_target(target)
            self.results.append(result)
            
            # 显示结果
            tests = result['tests']
            for test_type, test_result in tests.items():
                status_icons = {
                    'reachable': '✅',
                    'open': '✅',
                    'resolved': '✅',
                    'accessible': '✅',
                    'unreachable': '❌',
                    'closed': '❌',
                    'unresolved': '❌',
                    'timeout': '⏰',
                    'error': '⚠️',
                    'connection_error': '❌',
                    'dns_error': '❌'
                }
                
                icon = status_icons.get(test_result['status'], '❓')
                print(f"  {test_type.upper()}: {icon} {test_result['status']}")
                
                if 'error' in test_result:
                    print(f"    错误: {test_result['error']}")
                elif test_type == 'ping':
                    if 'packet_loss' in test_result:
                        print(f"    丢包率: {test_result['packet_loss']}%")
                    if 'avg_rtt' in test_result and test_result['avg_rtt'] > 0:
                        print(f"    平均延迟: {test_result['avg_rtt']}ms")
                elif test_type == 'port':
                    print(f"    端口: {test_result['port']}")
                elif test_type == 'http':
                    if 'status_code' in test_result:
                        print(f"    状态码: {test_result['status_code']}")
                    if 'response_time' in test_result:
                        print(f"    响应时间: {test_result['response_time']:.3f}s")
                elif test_type == 'dns':
                    if 'ip_addresses' in test_result:
                        print(f"    IP地址: {', '.join(test_result['ip_addresses'])}")
                        
            print()
            
        return self.results
        
    def generate_report(self):
        """生成检查报告"""
        report = []
        report.append("=" * 80)
        report.append("网络连通性检查报告")
        report.append("=" * 80)
        report.append(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"检查目标数: {len(self.results)}")
        report.append("")
        
        # 统计信息
        stats = {
            'reachable': 0,
            'unreachable': 0,
            'timeout': 0,
            'error': 0
        }
        
        for result in self.results:
            for test_result in result['tests'].values():
                status = test_result['status']
                if status in stats:
                    stats[status] += 1
                elif status not in ['open', 'resolved', 'accessible']:
                    stats['error'] += 1
                    
        report.append("状态统计:")
        report.append(f"  可达: {stats['reachable']}")
        report.append(f"  不可达: {stats['unreachable']}")
        report.append(f"  超时: {stats['timeout']}")
        report.append(f"  错误: {stats['error']}")
        report.append("")
        
        # 详细结果
        report.append("详细检查结果:")
        report.append("-" * 50)
        for result in self.results:
            report.append(f"目标: {result['target']}")
            for test_type, test_result in result['tests'].items():
                status_display = {
                    'reachable': '✅ 可达',
                    'open': '✅ 开放',
                    'resolved': '✅ 已解析',
                    'accessible': '✅ 可访问',
                    'unreachable': '❌ 不可达',
                    'closed': '❌ 关闭',
                    'unresolved': '❌ 未解析',
                    'timeout': '⏰ 超时',
                    'error': '⚠️ 错误',
                    'connection_error': '❌ 连接错误',
                    'dns_error': '❌ DNS错误'
                }
                
                status_text = status_display.get(test_result['status'], test_result['status'])
                report.append(f"  {test_type.upper()}: {status_text}")
                
                if 'error' in test_result:
                    report.append(f"    错误: {test_result['error']}")
                elif test_type == 'ping':
                    if 'packet_loss' in test_result:
                        report.append(f"    丢包率: {test_result['packet_loss']}%")
                    if 'avg_rtt' in test_result and test_result['avg_rtt'] > 0:
                        report.append(f"    平均延迟: {test_result['avg_rtt']}ms")
                elif test_type == 'port':
                    report.append(f"    端口: {test_result['port']}")
                elif test_type == 'http':
                    if 'status_code' in test_result:
                        report.append(f"    状态码: {test_result['status_code']}")
                    if 'response_time' in test_result:
                        report.append(f"    响应时间: {test_result['response_time']:.3f}s")
                elif test_type == 'dns':
                    if 'ip_addresses' in test_result:
                        report.append(f"    IP地址: {', '.join(test_result['ip_addresses'])}")
            report.append("")
            
        # 错误信息
        if self.errors:
            report.append("错误信息:")
            report.append("-" * 50)
            for error in self.errors[:10]:  # 只显示前10个错误
                report.append(f"  {error}")
            if len(self.errors) > 10:
                report.append(f"  ... 还有 {len(self.errors) - 10} 个错误")
            report.append("")
            
        report.append("=" * 80)
        return "\n".join(report)
        
    def save_report(self, filename):
        """保存报告到文件"""
        try:
            report = self.generate_report()
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(report)
            print(f"报告已保存到: {filename}")
            return True
        except Exception as e:
            print(f"保存报告时出错: {e}")
            return False
            
    def save_json_report(self, filename):
        """保存JSON格式报告"""
        try:
            report_data = {
                'check_time': datetime.now().isoformat(),
                'results': self.results,
                'errors': self.errors
            }
            
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(report_data, f, indent=2, ensure_ascii=False)
            print(f"JSON报告已保存到: {filename}")
            return True
        except Exception as e:
            print(f"保存JSON报告时出错: {e}")
            return False
            
    def continuous_monitor(self, targets, interval=60, max_checks=None):
        """持续监控网络连通性"""
        check_count = 0
        
        try:
            while True:
                print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始第 {check_count + 1} 次检查")
                self.check_multiple_targets(targets)
                
                # 显示简要状态
                success_count = 0
                total_tests = 0
                
                for result in self.results:
                    for test_result in result['tests'].values():
                        total_tests += 1
                        if test_result['status'] in ['reachable', 'open', 'resolved', 'accessible']:
                            success_count += 1
                            
                print(f"成功测试: {success_count}/{total_tests}")
                
                check_count += 1
                
                # 检查是否达到最大检查次数
                if max_checks and check_count >= max_checks:
                    break
                    
                # 等待下次检查
                time.sleep(interval)
                
        except KeyboardInterrupt:
            print("\n\n停止监控...")

def main():
    parser = argparse.ArgumentParser(description="网络连通性检查工具")
    parser.add_argument("targets", nargs='*', help="要检查的目标(主机名、IP地址、URL等)")
    parser.add_argument("-f", "--file", help="从文件读取目标列表")
    parser.add_argument("-o", "--output", help="保存报告到文件")
    parser.add_argument("-j", "--json", help="保存JSON报告到文件")
    parser.add_argument("-m", "--monitor", type=int, help="持续监控间隔(秒)")
    parser.add_argument("--max-checks", type=int, help="最大检查次数")
    parser.add_argument("--timeout", type=int, default=3, help="网络操作超时时间(秒,默认:3)")
    
    args = parser.parse_args()
    
    # 检查目标
    targets = []
    if args.targets:
        targets.extend(args.targets)
    if args.file:
        try:
            with open(args.file, 'r', encoding='utf-8') as f:
                file_targets = [line.strip() for line in f if line.strip()]
                targets.extend(file_targets)
        except Exception as e:
            print(f"读取目标文件时出错: {e}")
            sys.exit(1)
            
    if not targets:
        print("请指定要检查的目标")
        parser.print_help()
        sys.exit(1)
        
    # 检查requests库
    try:
        import requests
    except ImportError:
        print("警告: 缺少requests库,HTTP测试功能将不可用")
        print("安装命令: pip install requests")
        
    try:
        checker = NetworkConnectivityChecker()
        
        # 检查网络连通性
        if args.monitor:
            # 持续监控模式
            checker.continuous_monitor(targets, args.monitor, args.max_checks)
        else:
            # 单次检查模式
            checker.check_multiple_targets(targets)
            
            # 生成报告
            if args.output:
                checker.save_report(args.output)
            elif args.json:
                checker.save_json_report(args.json)
            else:
                print(checker.generate_report())
                
    except KeyboardInterrupt:
        print("\n\n用户中断操作")
    except Exception as e:
        print(f"程序执行出错: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

使用方法

安装依赖

在使用此脚本之前,需要安装必要的库:

# HTTP测试需要
pip install requests

基本使用

# 检查单个目标
python network_checker.py google.com

# 检查多个目标
python network_checker.py google.com github.com 8.8.8.8

# 检查特定端口
python network_checker.py google.com:443 github.com:22

# 检查HTTP服务
python network_checker.py https://www.google.com http://github.com

# 从文件读取目标列表
python network_checker.py -f targets.txt

# 保存报告到文件
python network_checker.py google.com github.com -o network_report.txt

# 保存JSON报告
python network_checker.py google.com github.com -j network_report.json

# 持续监控(每30秒检查一次)
python network_checker.py google.com github.com -m 30

# 持续监控最多10次
python network_checker.py google.com github.com -m 30 --max-checks 10

目标文件示例

创建目标文件 targets.txt

google.com
github.com
8.8.8.8
1.1.1.1
https://www.baidu.com
google.com:443
github.com:22

命令行参数说明

使用示例

基本检查

python network_checker.py google.com github.com 8.8.8.8

持续监控

python network_checker.py google.com github.com -m 60 --max-checks 10 -o status.log

综合测试

python network_checker.py -f targets.txt -o full_report.txt -j report.json

总结

这个网络连通性检查工具提供了一个全面的网络诊断解决方案,能够测试Ping连通性、端口开放状态、DNS解析和HTTP服务可达性。它支持批量测试多个目标,提供详细的测试报告,并可以持续监控网络状态。工具适用于网络故障排查、系统监控和运维自动化等多种场景。通过这个工具,用户可以快速诊断网络问题,确保网络服务的稳定运行。

以上就是Python开发一个实用的网络连通性检查工具的详细内容,更多关于Python网络连通性检查的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文