Python开发一个实用的网络连通性检查工具
作者:零日失眠者
网络连通性是现代计算机系统正常运行的基础,本文将介绍一个实用的Python网络连通性检查工具,可以全面检查网络连接状态,帮助用户快速诊断网络问题,有需要的小伙伴可以了解下
简介
网络连通性是现代计算机系统正常运行的基础。无论是访问互联网、连接内部网络还是与其他设备通信,网络连接的稳定性都至关重要。当网络出现问题时,快速诊断和定位故障点是解决问题的关键。本文将介绍一个实用的Python脚本——网络连通性检查工具,它可以全面检查网络连接状态,帮助用户快速诊断网络问题。
功能介绍
这个网络连通性检查工具具有以下核心功能:
- Ping测试:测试与目标主机的ICMP连通性
- 端口检查:检查指定主机的端口是否开放
- DNS解析测试:验证域名解析功能
- HTTP连通性测试:测试Web服务的可达性
- 多目标批量测试:支持同时测试多个目标
- 详细报告生成:提供详细的测试结果报告
- 定时监控:支持定期检查网络连通性
- 告警通知:在网络故障时发送告警通知
应用场景
这个工具适用于以下场景:
- 网络故障排查:快速诊断网络连接问题
- 系统监控:监控关键网络服务的可用性
- 运维自动化:集成到运维流程中自动检查网络状态
- 网络性能测试:评估网络连接的质量和延迟
- 安全审计:检查网络服务的开放状态
- 灾难恢复:在网络恢复后验证连接状态
报错处理
脚本包含了完善的错误处理机制:
- 超时处理:合理设置网络操作的超时时间
- 异常捕获:捕获并处理网络操作中的各种异常
- 权限验证:检查是否有足够的权限执行网络测试
- DNS解析保护:处理DNS解析失败的情况
- 连接重试:在网络不稳定时自动重试连接
- 资源清理:确保网络连接和资源得到正确释放
代码实现
import os
import sys
import subprocess
import socket
import argparse
import json
import time
from datetime import datetime
import threading
import requests
from urllib.parse import urlparse
class NetworkConnectivityChecker:
def __init__(self):
self.results = []
self.errors = []
self.monitoring = False
def ping_host(self, host, count=4, timeout=3):
"""Ping测试"""
try:
# 根据操作系统选择ping命令
if os.name == 'nt': # Windows
cmd = ['ping', '-n', str(count), '-w', str(timeout * 1000), host]
else: # Linux/macOS
cmd = ['ping', '-c', str(count), '-W', str(timeout), host]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout * count + 5)
if result.returncode == 0:
# 解析ping输出获取统计信息
output = result.stdout
packet_loss = 0
avg_rtt = 0
if os.name == 'nt': # Windows
# Windows ping输出解析
for line in output.split('\n'):
if 'Lost' in line:
# 匹配 "Lost = X (X% loss)"
import re
match = re.search(r'\((\d+)% loss\)', line)
if match:
packet_loss = int(match.group(1))
elif 'Average' in line:
# 匹配 "Average = Xms"
import re
match = re.search(r'Average = (\d+)ms', line)
if match:
avg_rtt = int(match.group(1))
else: # Linux/macOS
# Unix-like系统ping输出解析
for line in output.split('\n'):
if 'packet loss' in line:
# 匹配 "X packets transmitted, Y received, Z% packet loss"
import re
match = re.search(r'(\d+)% packet loss', line)
if match:
packet_loss = int(match.group(1))
elif 'rtt' in line:
# 匹配 "rtt min/avg/max/mdev = X/Y/Z/W ms"
import re
match = re.search(r'rtt [^=]+= [^/]+/([^/]+)/', line)
if match:
avg_rtt = float(match.group(1))
return {
'host': host,
'status': 'reachable',
'packet_loss': packet_loss,
'avg_rtt': avg_rtt,
'output': output
}
else:
return {
'host': host,
'status': 'unreachable',
'error': result.stderr or 'Host unreachable',
'output': result.stdout
}
except subprocess.TimeoutExpired:
return {
'host': host,
'status': 'timeout',
'error': f'Ping timeout after {timeout * count + 5} seconds'
}
except Exception as e:
return {
'host': host,
'status': 'error',
'error': str(e)
}
def check_port(self, host, port, timeout=3):
"""端口检查"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
return {
'host': host,
'port': port,
'status': 'open'
}
else:
return {
'host': host,
'port': port,
'status': 'closed',
'error': f'Connection refused (code: {result})'
}
except socket.gaierror as e:
return {
'host': host,
'port': port,
'status': 'dns_error',
'error': f'DNS resolution failed: {e}'
}
except Exception as e:
return {
'host': host,
'port': port,
'status': 'error',
'error': str(e)
}
def resolve_dns(self, hostname):
"""DNS解析测试"""
try:
ip_addresses = socket.getaddrinfo(hostname, None)
ips = []
for addr_info in ip_addresses:
ip = addr_info[4][0]
if ip not in ips:
ips.append(ip)
return {
'hostname': hostname,
'status': 'resolved',
'ip_addresses': ips
}
except socket.gaierror as e:
return {
'hostname': hostname,
'status': 'unresolved',
'error': str(e)
}
except Exception as e:
return {
'hostname': hostname,
'status': 'error',
'error': str(e)
}
def http_check(self, url, timeout=10):
"""HTTP连通性测试"""
try:
parsed_url = urlparse(url)
if not parsed_url.scheme:
url = 'http://' + url
response = requests.get(url, timeout=timeout)
return {
'url': url,
'status': 'accessible',
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'content_length': len(response.content)
}
except requests.exceptions.Timeout:
return {
'url': url,
'status': 'timeout',
'error': f'Request timeout after {timeout} seconds'
}
except requests.exceptions.ConnectionError as e:
return {
'url': url,
'status': 'connection_error',
'error': str(e)
}
except requests.exceptions.RequestException as e:
return {
'url': url,
'status': 'error',
'error': str(e)
}
except Exception as e:
return {
'url': url,
'status': 'error',
'error': str(e)
}
def check_target(self, target):
"""检查单个目标"""
result = {
'target': target,
'timestamp': datetime.now().isoformat(),
'tests': {}
}
# 判断目标类型
if target.startswith('http://') or target.startswith('https://'):
# HTTP测试
result['tests']['http'] = self.http_check(target)
elif ':' in target and not target.replace(':', '').replace('.', '').isdigit():
# 主机名:端口格式
host, port = target.rsplit(':', 1)
if port.isdigit():
result['tests']['ping'] = self.ping_host(host)
result['tests']['port'] = self.check_port(host, int(port))
result['tests']['dns'] = self.resolve_dns(host)
else:
result['tests']['ping'] = self.ping_host(target)
result['tests']['dns'] = self.resolve_dns(target)
elif target.replace('.', '').isdigit() or not target.replace('-', '').replace('.', '').isdigit():
# IP地址或主机名
result['tests']['ping'] = self.ping_host(target)
result['tests']['dns'] = self.resolve_dns(target)
else:
# 默认进行ping测试
result['tests']['ping'] = self.ping_host(target)
return result
def check_multiple_targets(self, targets):
"""检查多个目标"""
print(f"开始检查 {len(targets)} 个目标")
print("=" * 60)
self.results = []
for target in targets:
print(f"检查目标: {target}")
result = self.check_target(target)
self.results.append(result)
# 显示结果
tests = result['tests']
for test_type, test_result in tests.items():
status_icons = {
'reachable': '✅',
'open': '✅',
'resolved': '✅',
'accessible': '✅',
'unreachable': '❌',
'closed': '❌',
'unresolved': '❌',
'timeout': '⏰',
'error': '⚠️',
'connection_error': '❌',
'dns_error': '❌'
}
icon = status_icons.get(test_result['status'], '❓')
print(f" {test_type.upper()}: {icon} {test_result['status']}")
if 'error' in test_result:
print(f" 错误: {test_result['error']}")
elif test_type == 'ping':
if 'packet_loss' in test_result:
print(f" 丢包率: {test_result['packet_loss']}%")
if 'avg_rtt' in test_result and test_result['avg_rtt'] > 0:
print(f" 平均延迟: {test_result['avg_rtt']}ms")
elif test_type == 'port':
print(f" 端口: {test_result['port']}")
elif test_type == 'http':
if 'status_code' in test_result:
print(f" 状态码: {test_result['status_code']}")
if 'response_time' in test_result:
print(f" 响应时间: {test_result['response_time']:.3f}s")
elif test_type == 'dns':
if 'ip_addresses' in test_result:
print(f" IP地址: {', '.join(test_result['ip_addresses'])}")
print()
return self.results
def generate_report(self):
"""生成检查报告"""
report = []
report.append("=" * 80)
report.append("网络连通性检查报告")
report.append("=" * 80)
report.append(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"检查目标数: {len(self.results)}")
report.append("")
# 统计信息
stats = {
'reachable': 0,
'unreachable': 0,
'timeout': 0,
'error': 0
}
for result in self.results:
for test_result in result['tests'].values():
status = test_result['status']
if status in stats:
stats[status] += 1
elif status not in ['open', 'resolved', 'accessible']:
stats['error'] += 1
report.append("状态统计:")
report.append(f" 可达: {stats['reachable']}")
report.append(f" 不可达: {stats['unreachable']}")
report.append(f" 超时: {stats['timeout']}")
report.append(f" 错误: {stats['error']}")
report.append("")
# 详细结果
report.append("详细检查结果:")
report.append("-" * 50)
for result in self.results:
report.append(f"目标: {result['target']}")
for test_type, test_result in result['tests'].items():
status_display = {
'reachable': '✅ 可达',
'open': '✅ 开放',
'resolved': '✅ 已解析',
'accessible': '✅ 可访问',
'unreachable': '❌ 不可达',
'closed': '❌ 关闭',
'unresolved': '❌ 未解析',
'timeout': '⏰ 超时',
'error': '⚠️ 错误',
'connection_error': '❌ 连接错误',
'dns_error': '❌ DNS错误'
}
status_text = status_display.get(test_result['status'], test_result['status'])
report.append(f" {test_type.upper()}: {status_text}")
if 'error' in test_result:
report.append(f" 错误: {test_result['error']}")
elif test_type == 'ping':
if 'packet_loss' in test_result:
report.append(f" 丢包率: {test_result['packet_loss']}%")
if 'avg_rtt' in test_result and test_result['avg_rtt'] > 0:
report.append(f" 平均延迟: {test_result['avg_rtt']}ms")
elif test_type == 'port':
report.append(f" 端口: {test_result['port']}")
elif test_type == 'http':
if 'status_code' in test_result:
report.append(f" 状态码: {test_result['status_code']}")
if 'response_time' in test_result:
report.append(f" 响应时间: {test_result['response_time']:.3f}s")
elif test_type == 'dns':
if 'ip_addresses' in test_result:
report.append(f" IP地址: {', '.join(test_result['ip_addresses'])}")
report.append("")
# 错误信息
if self.errors:
report.append("错误信息:")
report.append("-" * 50)
for error in self.errors[:10]: # 只显示前10个错误
report.append(f" {error}")
if len(self.errors) > 10:
report.append(f" ... 还有 {len(self.errors) - 10} 个错误")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def save_report(self, filename):
"""保存报告到文件"""
try:
report = self.generate_report()
with open(filename, 'w', encoding='utf-8') as f:
f.write(report)
print(f"报告已保存到: {filename}")
return True
except Exception as e:
print(f"保存报告时出错: {e}")
return False
def save_json_report(self, filename):
"""保存JSON格式报告"""
try:
report_data = {
'check_time': datetime.now().isoformat(),
'results': self.results,
'errors': self.errors
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
print(f"JSON报告已保存到: {filename}")
return True
except Exception as e:
print(f"保存JSON报告时出错: {e}")
return False
def continuous_monitor(self, targets, interval=60, max_checks=None):
"""持续监控网络连通性"""
check_count = 0
try:
while True:
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始第 {check_count + 1} 次检查")
self.check_multiple_targets(targets)
# 显示简要状态
success_count = 0
total_tests = 0
for result in self.results:
for test_result in result['tests'].values():
total_tests += 1
if test_result['status'] in ['reachable', 'open', 'resolved', 'accessible']:
success_count += 1
print(f"成功测试: {success_count}/{total_tests}")
check_count += 1
# 检查是否达到最大检查次数
if max_checks and check_count >= max_checks:
break
# 等待下次检查
time.sleep(interval)
except KeyboardInterrupt:
print("\n\n停止监控...")
def main():
parser = argparse.ArgumentParser(description="网络连通性检查工具")
parser.add_argument("targets", nargs='*', help="要检查的目标(主机名、IP地址、URL等)")
parser.add_argument("-f", "--file", help="从文件读取目标列表")
parser.add_argument("-o", "--output", help="保存报告到文件")
parser.add_argument("-j", "--json", help="保存JSON报告到文件")
parser.add_argument("-m", "--monitor", type=int, help="持续监控间隔(秒)")
parser.add_argument("--max-checks", type=int, help="最大检查次数")
parser.add_argument("--timeout", type=int, default=3, help="网络操作超时时间(秒,默认:3)")
args = parser.parse_args()
# 检查目标
targets = []
if args.targets:
targets.extend(args.targets)
if args.file:
try:
with open(args.file, 'r', encoding='utf-8') as f:
file_targets = [line.strip() for line in f if line.strip()]
targets.extend(file_targets)
except Exception as e:
print(f"读取目标文件时出错: {e}")
sys.exit(1)
if not targets:
print("请指定要检查的目标")
parser.print_help()
sys.exit(1)
# 检查requests库
try:
import requests
except ImportError:
print("警告: 缺少requests库,HTTP测试功能将不可用")
print("安装命令: pip install requests")
try:
checker = NetworkConnectivityChecker()
# 检查网络连通性
if args.monitor:
# 持续监控模式
checker.continuous_monitor(targets, args.monitor, args.max_checks)
else:
# 单次检查模式
checker.check_multiple_targets(targets)
# 生成报告
if args.output:
checker.save_report(args.output)
elif args.json:
checker.save_json_report(args.json)
else:
print(checker.generate_report())
except KeyboardInterrupt:
print("\n\n用户中断操作")
except Exception as e:
print(f"程序执行出错: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
使用方法
安装依赖
在使用此脚本之前,需要安装必要的库:
# HTTP测试需要 pip install requests
基本使用
# 检查单个目标 python network_checker.py google.com # 检查多个目标 python network_checker.py google.com github.com 8.8.8.8 # 检查特定端口 python network_checker.py google.com:443 github.com:22 # 检查HTTP服务 python network_checker.py https://www.google.com http://github.com # 从文件读取目标列表 python network_checker.py -f targets.txt # 保存报告到文件 python network_checker.py google.com github.com -o network_report.txt # 保存JSON报告 python network_checker.py google.com github.com -j network_report.json # 持续监控(每30秒检查一次) python network_checker.py google.com github.com -m 30 # 持续监控最多10次 python network_checker.py google.com github.com -m 30 --max-checks 10
目标文件示例
创建目标文件 targets.txt:
google.com
github.com
8.8.8.8
1.1.1.1
https://www.baidu.com
google.com:443
github.com:22
命令行参数说明
targets: 要检查的目标(可多个)-f, --file: 从文件读取目标列表-o, --output: 保存报告到指定文件-j, --json: 保存JSON报告到指定文件-m, --monitor: 持续监控间隔(秒)--max-checks: 最大检查次数--timeout: 网络操作超时时间(秒,默认:3)
使用示例
基本检查:
python network_checker.py google.com github.com 8.8.8.8
持续监控:
python network_checker.py google.com github.com -m 60 --max-checks 10 -o status.log
综合测试:
python network_checker.py -f targets.txt -o full_report.txt -j report.json
总结
这个网络连通性检查工具提供了一个全面的网络诊断解决方案,能够测试Ping连通性、端口开放状态、DNS解析和HTTP服务可达性。它支持批量测试多个目标,提供详细的测试报告,并可以持续监控网络状态。工具适用于网络故障排查、系统监控和运维自动化等多种场景。通过这个工具,用户可以快速诊断网络问题,确保网络服务的稳定运行。
以上就是Python开发一个实用的网络连通性检查工具的详细内容,更多关于Python网络连通性检查的资料请关注脚本之家其它相关文章!
