python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python检测代理IP可用性

Python批量检测代理IP可用性的程序

作者:Love丶伊卡洛斯

这篇文章主要为大家详细介绍了如何通过Python实现一个代理池测试工具,支持测试代理的连通性、响应时间和匿名性,感兴趣的小伙伴可以跟随小编一起学习一下

前言

实现一个代理池测试工具,支持测试代理的连通性、响应时间和匿名性。用户需将代理配置写入.env文件,支持JSON数组、JSON对象和逗号分隔三种格式。测试过程会记录每个代理的状态(可用/不可用)、响应时间、HTTP状态码和匿名等级,并生成详细的测试报告,包括成功率统计、性能指标和可用代理列表。核心功能通过ProxyTester类实现,支持同步测试方式,提供全面的代理检测能力。

ip写到环境变量中,运行main.py即可

依赖库自行安装

输出效果

2025-10-23 21:30:41.145 | INFO     | __main__:test_proxies_sync:239 - 开始同步测试 2 个代理...
2025-10-23 21:30:42.236 | INFO     | __main__:test_proxies_sync:251 - ❌ [1/2] http://170.114.45.249:80 - 不可用 (1.09s)
2025-10-23 21:30:42.492 | INFO     | __main__:test_proxies_sync:251 - ✅ [2/2] http://127.0.0.1:10808 - 可用 (1.35s)

============================================================
🔍 代理池测试报告
============================================================
📊 测试概要:
  总代理数: 2
  可用代理: 1 ✅
  不可用代理: 1 ❌
  成功率: 50.00%

⚡ 性能统计:
  平均响应时间: 1.35s
  最快响应时间: 1.35s
  最慢响应时间: 1.35s

🔒 匿名性统计:
  transparent: 1个

✅ 可用代理列表:
  http://127.0.0.1:10808

❌ 不可用代理列表:
  http://170.114.45.249:80 - HTTPSConnectionPool(host='httpbin.org', port=443): Max retries exceeded with url: /ip (Caused by ProxyError('Unable to connect to proxy', OSError('Tunnel connection failed: 400 Bad Request')))

.env

# 代理池配置 - 支持多种格式

# 方式1:JSON数组格式(推荐)
PROXY_CONFIG='["http://127.0.0.1:8080", "http://proxy2.com:8080", "http://user:pass@proxy3.com:8080"]'

# 方式2:JSON对象格式(支持更多配置)
PROXY_CONFIG='{"proxies": ["http://127.0.0.1:8080", "http://proxy2.com:8080"], "timeout": 10}'

# 方式3:逗号分隔格式(兼容旧版)
PROXY_CONFIG="http://127.0.0.1:8080,http://proxy2.com:8080,http://user:pass@proxy3.com:8080"

# 代理测试相关配置
PROXY_TEST_URL="https://httpbin.org/ip"
PROXY_TEST_TIMEOUT="5"

main.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
代理池可用性测试程序
测试代理的连通性、响应时间、匿名性等指标

python main.py --direct --timeout 5 

python main.py --config --timeout 5 
"""

import asyncio
import aiohttp
import time
import json
import argparse
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import requests
from loguru import logger

@dataclass
class ProxyTestResult:
    """代理测试结果"""
    proxy: str
    is_available: bool
    response_time: float
    status_code: Optional[int] = None
    error_message: Optional[str] = None
    anonymity_level: Optional[str] = None  # transparent, anonymous, elite
    real_ip: Optional[str] = None
    proxy_ip: Optional[str] = None

class ProxyTester:
    """代理测试器"""
    
    def __init__(self, 
                 test_urls: List[str] = None,
                 timeout: int = 10,
                 max_workers: int = 20):
        """
        初始化代理测试器
        
        Args:
            test_urls: 测试URL列表
            timeout: 请求超时时间
            max_workers: 最大并发数
        """
        self.test_urls = test_urls or [
            "https://httpbin.org/ip",
            "https://api.ipify.org?format=json",
            "https://ifconfig.me/ip"
        ]
        self.timeout = timeout
        self.max_workers = max_workers
        
        # 获取本机真实IP
        self.real_ip = self._get_real_ip()
        logger.info(f"本机真实IP: {self.real_ip}")
    
    def _get_real_ip(self) -> Optional[str]:
        """获取本机真实IP"""
        try:
            response = requests.get("https://httpbin.org/ip", timeout=10)
            if response.status_code == 200:
                return response.json().get("origin", "").split(",")[0].strip()
        except Exception as e:
            logger.warning(f"获取真实IP失败: {e}")
        return None
    
    def test_proxy_sync(self, proxy: str) -> ProxyTestResult:
        """同步测试单个代理"""
        start_time = time.time()
        
        try:
            # 解析代理格式
            proxy_dict = {
                'http': proxy,
                'https': proxy
            }
            
            # 测试连通性
            response = requests.get(
                self.test_urls[0],
                proxies=proxy_dict,
                timeout=self.timeout,
                headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
            )
            
            response_time = time.time() - start_time
            
            if response.status_code == 200:
                # 解析响应获取代理IP
                try:
                    data = response.json()
                    proxy_ip = data.get("origin", "").split(",")[0].strip()
                except:
                    proxy_ip = None
                
                # 判断匿名性
                anonymity_level = self._check_anonymity(proxy_ip)
                
                return ProxyTestResult(
                    proxy=proxy,
                    is_available=True,
                    response_time=response_time,
                    status_code=response.status_code,
                    anonymity_level=anonymity_level,
                    real_ip=self.real_ip,
                    proxy_ip=proxy_ip
                )
            else:
                return ProxyTestResult(
                    proxy=proxy,
                    is_available=False,
                    response_time=response_time,
                    status_code=response.status_code,
                    error_message=f"HTTP {response.status_code}"
                )
                
        except Exception as e:
            response_time = time.time() - start_time
            return ProxyTestResult(
                proxy=proxy,
                is_available=False,
                response_time=response_time,
                error_message=str(e)
            )
    
    async def test_proxy_async(self, session: aiohttp.ClientSession, proxy: str) -> ProxyTestResult:
        """异步测试单个代理"""
        start_time = time.time()
        
        try:
            async with session.get(
                self.test_urls[0],
                proxy=proxy,
                timeout=aiohttp.ClientTimeout(total=self.timeout),
                headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
            ) as response:
                response_time = time.time() - start_time
                
                if response.status == 200:
                    try:
                        data = await response.json()
                        proxy_ip = data.get("origin", "").split(",")[0].strip()
                    except:
                        proxy_ip = None
                    
                    anonymity_level = self._check_anonymity(proxy_ip)
                    
                    return ProxyTestResult(
                        proxy=proxy,
                        is_available=True,
                        response_time=response_time,
                        status_code=response.status,
                        anonymity_level=anonymity_level,
                        real_ip=self.real_ip,
                        proxy_ip=proxy_ip
                    )
                else:
                    return ProxyTestResult(
                        proxy=proxy,
                        is_available=False,
                        response_time=response_time,
                        status_code=response.status,
                        error_message=f"HTTP {response.status}"
                    )
                    
        except Exception as e:
            response_time = time.time() - start_time
            return ProxyTestResult(
                proxy=proxy,
                is_available=False,
                response_time=response_time,
                error_message=str(e)
            )
    
    def _check_anonymity(self, proxy_ip: Optional[str]) -> str:
        """检查代理匿名性"""
        if not proxy_ip or not self.real_ip:
            return "unknown"
        
        if proxy_ip == self.real_ip:
            return "transparent"  # 透明代理,暴露真实IP
        else:
            return "anonymous"    # 匿名代理,隐藏真实IP
    
    def test_direct_connection(self) -> ProxyTestResult:
        """测试直连网络连通性"""
        start_time = time.time()
        
        try:
            response = requests.get(
                self.test_urls[0],
                timeout=self.timeout,
                headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
            )
            
            response_time = time.time() - start_time
            
            if response.status_code == 200:
                try:
                    data = response.json()
                    real_ip = data.get("origin", "").split(",")[0].strip()
                except:
                    real_ip = None
                
                return ProxyTestResult(
                    proxy="直连模式",
                    is_available=True,
                    response_time=response_time,
                    status_code=response.status_code,
                    anonymity_level="direct",
                    real_ip=real_ip,
                    proxy_ip=real_ip
                )
            else:
                return ProxyTestResult(
                    proxy="直连模式",
                    is_available=False,
                    response_time=response_time,
                    status_code=response.status_code,
                    error_message=f"HTTP {response.status_code}"
                )
                
        except Exception as e:
            response_time = time.time() - start_time
            return ProxyTestResult(
                proxy="直连模式",
                is_available=False,
                response_time=response_time,
                error_message=str(e)
            )
    
    def test_proxies_sync(self, proxies: List[str]) -> List[ProxyTestResult]:
        """同步批量测试代理"""
        logger.info(f"开始同步测试 {len(proxies)} 个代理...")
        
        results = []
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [executor.submit(self.test_proxy_sync, proxy) for proxy in proxies]
            
            for i, future in enumerate(futures):
                try:
                    result = future.result()
                    results.append(result)
                    
                    status = "✅" if result.is_available else "❌"
                    logger.info(f"{status} [{i+1}/{len(proxies)}] {result.proxy} - "
                              f"{'可用' if result.is_available else '不可用'} "
                              f"({result.response_time:.2f}s)")
                    
                except Exception as e:
                    logger.error(f"测试代理失败: {e}")
        
        return results
    
    async def test_proxies_async(self, proxies: List[str]) -> List[ProxyTestResult]:
        """异步批量测试代理"""
        logger.info(f"开始异步测试 {len(proxies)} 个代理...")
        
        connector = aiohttp.TCPConnector(limit=self.max_workers)
        async with aiohttp.ClientSession(connector=connector) as session:
            tasks = [self.test_proxy_async(session, proxy) for proxy in proxies]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理结果
            valid_results = []
            for i, result in enumerate(results):
                if isinstance(result, ProxyTestResult):
                    valid_results.append(result)
                    status = "✅" if result.is_available else "❌"
                    logger.info(f"{status} [{i+1}/{len(proxies)}] {result.proxy} - "
                              f"{'可用' if result.is_available else '不可用'} "
                              f"({result.response_time:.2f}s)")
                else:
                    logger.error(f"测试代理异常: {result}")
            
            return valid_results
    
    def generate_report(self, results: List[ProxyTestResult]) -> Dict:
        """生成测试报告"""
        total_count = len(results)
        available_count = sum(1 for r in results if r.is_available)
        unavailable_count = total_count - available_count
        
        if available_count > 0:
            avg_response_time = sum(r.response_time for r in results if r.is_available) / available_count
            min_response_time = min(r.response_time for r in results if r.is_available)
            max_response_time = max(r.response_time for r in results if r.is_available)
        else:
            avg_response_time = min_response_time = max_response_time = 0
        
        # 按匿名性分类
        anonymity_stats = {}
        for result in results:
            if result.is_available and result.anonymity_level:
                anonymity_stats[result.anonymity_level] = anonymity_stats.get(result.anonymity_level, 0) + 1
        
        report = {
            "summary": {
                "total_proxies": total_count,
                "available_proxies": available_count,
                "unavailable_proxies": unavailable_count,
                "success_rate": f"{(available_count / total_count * 100):.2f}%" if total_count > 0 else "0%"
            },
            "performance": {
                "avg_response_time": f"{avg_response_time:.2f}s",
                "min_response_time": f"{min_response_time:.2f}s",
                "max_response_time": f"{max_response_time:.2f}s"
            },
            "anonymity": anonymity_stats,
            "available_proxies": [r.proxy for r in results if r.is_available],
            "unavailable_proxies": [
                {"proxy": r.proxy, "error": r.error_message} 
                for r in results if not r.is_available
            ]
        }
        
        return report
    
    def print_report(self, report: Dict):
        """打印测试报告"""
        print("\n" + "="*60)
        print("🔍 代理池测试报告")
        print("="*60)
        
        # 概要信息
        summary = report["summary"]
        print(f"📊 测试概要:")
        print(f"   总代理数: {summary['total_proxies']}")
        print(f"   可用代理: {summary['available_proxies']} ✅")
        print(f"   不可用代理: {summary['unavailable_proxies']} ❌")
        print(f"   成功率: {summary['success_rate']}")
        
        # 性能信息
        if summary['available_proxies'] > 0:
            perf = report["performance"]
            print(f"\n⚡ 性能统计:")
            print(f"   平均响应时间: {perf['avg_response_time']}")
            print(f"   最快响应时间: {perf['min_response_time']}")
            print(f"   最慢响应时间: {perf['max_response_time']}")
        
        # 匿名性统计
        if report["anonymity"]:
            print(f"\n🔒 匿名性统计:")
            for level, count in report["anonymity"].items():
                print(f"   {level}: {count}个")
        
        # 可用代理列表
        if report["available_proxies"]:
            print(f"\n✅ 可用代理列表:")
            for proxy in report["available_proxies"]:
                print(f"   {proxy}")
        
        # 不可用代理列表
        if report["unavailable_proxies"]:
            print(f"\n❌ 不可用代理列表:")
            for item in report["unavailable_proxies"]:
                print(f"   {item['proxy']} - {item['error']}")
        
        print("="*60)

def load_proxies_from_config(env_file: Optional[str] = None) -> List[str]:
    """从配置文件加载代理列表"""
    try:
        import sys
        import os
        from dotenv import load_dotenv
        import json
        
        # 加载环境变量
        if env_file:
            load_dotenv(env_file)
        else:
            load_dotenv()
        
        # 尝试从环境变量直接读取
        proxy_config = os.getenv("PROXY_CONFIG", "")
        if proxy_config:
            try:
                # 支持JSON格式配置
                if proxy_config.strip().startswith('[') or proxy_config.strip().startswith('{'):
                    proxy_data = json.loads(proxy_config)
                    if isinstance(proxy_data, list):
                        return proxy_data
                    elif isinstance(proxy_data, dict) and 'proxies' in proxy_data:
                        return proxy_data['proxies']
                else:
                    # 兼容旧的逗号分隔格式
                    proxies = [p.strip() for p in proxy_config.replace('\n', ',').split(',') if p.strip()]
                    return proxies
            except json.JSONDecodeError:
                # 如果JSON解析失败,回退到逗号分隔格式
                proxies = [p.strip() for p in proxy_config.replace('\n', ',').split(',') if p.strip()]
                return proxies
        
        # 如果环境变量没有,尝试从config.py加载
        sys.path.append(os.path.dirname(__file__))
        from config import PROXY_LIST
        return PROXY_LIST
        
    except Exception as e:
        logger.warning(f"从配置文件加载代理失败: {e}")
        return []

def main():
    """主函数"""
    parser = argparse.ArgumentParser(description="代理池可用性测试程序")
    parser.add_argument("--proxies", "-p", nargs="+", help="代理列表")
    parser.add_argument("--file", "-f", help="从文件读取代理列表")
    parser.add_argument("--config", "-c", action="store_true", help="从config.py读取代理列表")
    parser.add_argument("--env-file", "-e", help="指定环境文件路径 (如 .env.test)")
    parser.add_argument("--direct", "-d", action="store_true", help="测试直连网络连通性")
    parser.add_argument("--timeout", "-t", type=int, default=10, help="请求超时时间(秒)")
    parser.add_argument("--workers", "-w", type=int, default=20, help="最大并发数")
    parser.add_argument("--async-mode", "-a", action="store_true", help="使用异步模式")
    parser.add_argument("--output", "-o", help="输出报告到文件")
    
    args = parser.parse_args()
    
    # 获取代理列表
    proxies = []
    
    if args.direct:
        # 测试直连模式
        tester = ProxyTester(timeout=args.timeout, max_workers=args.workers)
        logger.info("测试直连网络连通性...")
        direct_result = tester.test_direct_connection()
        report = tester.generate_report([direct_result])
        tester.print_report(report)
        
        if args.output:
            try:
                with open(args.output, 'w', encoding='utf-8') as f:
                    json.dump(report, f, ensure_ascii=False, indent=2)
                logger.info(f"报告已保存到: {args.output}")
            except Exception as e:
                logger.error(f"保存报告失败: {e}")
        return
    elif args.config:
        proxies = load_proxies_from_config(getattr(args, 'env_file', None))
        logger.info(f"从配置文件加载了 {len(proxies)} 个代理")
    elif args.file:
        try:
            with open(args.file, 'r', encoding='utf-8') as f:
                proxies = [line.strip() for line in f if line.strip()]
            logger.info(f"从文件 {args.file} 加载了 {len(proxies)} 个代理")
        except Exception as e:
            logger.error(f"读取文件失败: {e}")
            return
    elif args.proxies:
        proxies = args.proxies
        logger.info(f"使用命令行参数提供的 {len(proxies)} 个代理")
    else:
        # 使用示例代理
        proxies = [
            "http://127.0.0.1:8080",
            "http://127.0.0.1:8081",
            "http://proxy1.example.com:8080",
            "http://proxy2.example.com:8080"
        ]
        logger.info(f"使用示例代理列表 {len(proxies)} 个代理")
    
    if not proxies:
        logger.error("没有找到可测试的代理")
        return
    
    # 创建测试器
    tester = ProxyTester(timeout=args.timeout, max_workers=args.workers)
    
    # 执行测试
    if getattr(args, 'async_mode', False):
        results = asyncio.run(tester.test_proxies_async(proxies))
    else:
        results = tester.test_proxies_sync(proxies)
    
    # 生成报告
    report = tester.generate_report(results)
    
    # 打印报告
    tester.print_report(report)
    
    # 保存报告到文件
    if args.output:
        try:
            with open(args.output, 'w', encoding='utf-8') as f:
                json.dump(report, f, ensure_ascii=False, indent=2)
            logger.info(f"报告已保存到: {args.output}")
        except Exception as e:
            logger.error(f"保存报告失败: {e}")

if __name__ == "__main__":
    main()

到此这篇关于Python批量检测代理IP可用性的程序的文章就介绍到这了,更多相关Python检测代理IP可用性内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文