python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > 国内镜像源创建离线PyPI镜像

使用国内镜像源创建离线PyPI镜像的完整方案

作者:东方佑

根据知识库信息,清华镜像已明确会阻断大量下载行为的请求,为避免此问题,我将提供一个安全使用国内镜像源的完整方案,确保能够一次性准备指定Python版本的所有包,然后导出到内网环境,需要的朋友可以参考下

国内可用的PyPI镜像源

镜像源URL特点
阿里云https://mirrors.aliyun.com/pypi/simple/稳定性好,带宽充足
中国科学技术大学https://pypi.mirrors.ustc.edu.cn/simple/较宽松的下载限制
豆瓣https://pypi.douban.com/simple/历史较久,但限制较严格
华为云https://repo.huaweicloud.com/repository/pypi/simple/新兴镜像,带宽充足

建议:优先使用阿里云或中科大镜像,它们对下载行为的限制相对宽松。

安全下载策略(避免触发限制)

基于知识库的警告,我们需要实施极其谨慎的下载策略:

  1. 极低并发:最多1-2个并发下载
  2. 长随机延迟:10-30秒的随机延迟
  3. 小批次处理:每批次仅处理20-30个包
  4. 多镜像轮换:在阿里云和中科大镜像之间轮换
  5. 失败重试机制:对失败的请求进行智能重试

完整解决方案

1. 修改下载脚本 scripts/safe_download.py

#!/usr/bin/env python3
import os
import sys
import time
import random
import requests
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

# 安全配置参数 (根据知识库警告特别调整)
MAX_RETRIES = 5  # 增加重试次数
MIN_DELAY = 15  # 增加最小延迟至15秒 (避免触发限制)
MAX_DELAY = 30  # 增加最大延迟至30秒
MAX_WORKERS = 1  # 严格限制为单线程 (最关键的安全措施)
BATCH_SIZE = 20  # 减小批次大小

# 国内镜像源列表 (轮换使用)
MIRRORS = [
    "https://mirrors.aliyun.com/pypi/simple",
    "https://pypi.mirrors.ustc.edu.cn/simple"
]
CURRENT_MIRROR_IDX = 0

def get_next_mirror():
    """轮换使用镜像源"""
    global CURRENT_MIRROR_IDX
    mirror = MIRRORS[CURRENT_MIRROR_IDX]
    CURRENT_MIRROR_IDX = (CURRENT_MIRROR_IDX + 1) % len(MIRRORS)
    return mirror

def get_package_info(package_name):
    """获取包的元信息 - 使用国内镜像API"""
    # 使用随机镜像
    mirror = get_next_mirror()
    
    # 通过simple API获取信息 (更安全)
    url = f"{mirror}/{package_name}/"
    
    for i in range(MAX_RETRIES):
        try:
            # 添加随机User-Agent
            headers = {
                'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36'
            }
            
            response = requests.get(url, headers=headers, timeout=45)
            
            if response.status_code == 200:
                # 解析HTML获取版本信息
                import re
                versions = re.findall(r'href="[^" rel="external nofollow" ]+?#([^"]+)"', response.text)
                if versions:
                    return {"versions": versions}
                return None
            elif response.status_code == 404:
                return None
            
            # 遇到其他状态码,等待更长时间
            time.sleep((i + 1) * 10)
        except Exception as e:
            print(f"获取包 {package_name} 信息失败: {str(e)}")
            time.sleep((i + 1) * 15)
    
    return None

def filter_packages_for_python(package_data, python_version):
    """过滤出兼容指定Python版本的包"""
    if not package_data or 'versions' not in package_data:
        return []
    
    compatible_files = []
    py_ver = python_version.replace('.', '')
    
    # 这里简化处理,实际需要更复杂的版本匹配
    # 在完整实现中,应解析每个版本的wheel标签
    for version in package_data['versions']:
        # 简单检查是否包含Python版本标识
        if py_ver in version or 'py3' in version or 'any' in version:
            # 构建下载URL (需要更精确的解析)
            mirror = get_next_mirror()
            file_url = f"{mirror}/{version}"
            compatible_files.append({
                'url': file_url,
                'filename': version.split('/')[-1] if '/' in version else version,
                'python_version': python_version
            })
    
    return compatible_files

def download_package(package_name, file_info, target_dir):
    """安全下载单个包文件"""
    file_url = file_info['url']
    file_name = file_info['filename']
    target_path = os.path.join(target_dir, file_name)
    
    # 如果文件已存在,跳过
    if os.path.exists(target_path):
        print(f"跳过已存在的文件: {file_name}")
        return True
    
    print(f"下载: {file_name} ({package_name})")
    
    for i in range(MAX_RETRIES):
        try:
            # 添加随机User-Agent和Referer
            headers = {
                'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36',
                'Referer': get_next_mirror() + '/'
            }
            
            response = requests.get(file_url, headers=headers, stream=True, timeout=90)
            
            if response.status_code == 200:
                with open(target_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                print(f"成功下载: {file_name}")
                return True
            elif response.status_code == 404:
                print(f"文件不存在 (404): {file_name}")
                return False
            
            print(f"下载失败 ({response.status_code}): {file_name}")
            # 遇到错误状态码,等待更长时间
            time.sleep((i + 1) * 20)
        except Exception as e:
            print(f"下载 {file_name} 失败: {str(e)}")
            time.sleep((i + 1) * 25)
    
    # 下载失败,删除可能的部分文件
    if os.path.exists(target_path):
        os.remove(target_path)
    return False

def get_all_packages():
    """获取所有包的列表 - 使用国内镜像的simple页面"""
    print("获取所有包的列表...")
    
    # 使用随机镜像
    mirror = get_next_mirror()
    url = f"{mirror}/"
    
    for i in range(MAX_RETRIES):
        try:
            headers = {
                'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36'
            }
            
            response = requests.get(url, headers=headers, timeout=60)
            if response.status_code == 200:
                # 解析HTML获取包名
                import re
                package_names = re.findall(r'<a href="/simple/([^" rel="external nofollow" /]+)">\1</a>', response.text)
                return package_names
            print(f"获取包列表失败: HTTP {response.status_code}")
            time.sleep((i + 1) * 30)
        except Exception as e:
            print(f"获取包列表失败: {str(e)}")
            time.sleep((i + 1) * 30)
    
    return []

def main():
    parser = argparse.ArgumentParser(description='安全下载指定Python版本的所有包 (使用国内镜像)')
    parser.add_argument('--python-version', required=True, help='目标Python版本 (如: 3.8)')
    parser.add_argument('--output-dir', default='../packages', help='输出目录')
    parser.add_argument('--max-packages', type=int, default=0, help='最大下载包数量(0表示全部)')
    args = parser.parse_args()
    
    # 创建输出目录
    output_dir = Path(args.output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    print(f"输出目录: {output_dir}")
    
    # 获取所有包列表
    try:
        all_packages = get_all_packages()
        if not all_packages:
            raise Exception("无法获取包列表,请稍后再试")
        
        print(f"找到 {len(all_packages)} 个包")
        
        # 限制包数量(用于测试)
        if args.max_packages > 0:
            all_packages = all_packages[:args.max_packages]
            print(f"限制为前 {args.max_packages} 个包")
    except Exception as e:
        print(f"获取包列表失败: {str(e)}")
        print("请检查网络连接,或稍后再试")
        return 1
    
    # 处理包
    success_count = 0
    failed_packages = []
    total_packages = len(all_packages)
    
    # 分批次处理,避免一次性太多请求
    for i in range(0, total_packages, BATCH_SIZE):
        batch = all_packages[i:i+BATCH_SIZE]
        print(f"\n处理包批次 {i//BATCH_SIZE + 1} ({len(batch)} 个包) / 总计 {total_packages}")
        
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            future_to_pkg = {}
            
            for pkg in batch:
                pkg_data = get_package_info(pkg)
                if pkg_data:
                    compatible_files = filter_packages_for_python(pkg_data, args.python_version)
                    if compatible_files:
                        for file_info in compatible_files:
                            future = executor.submit(
                                download_package, 
                                pkg, 
                                file_info, 
                                output_dir
                            )
                            future_to_pkg[future] = pkg
                # 添加长随机延迟,严格避免触发限制
                time.sleep(random.uniform(MIN_DELAY, MAX_DELAY))
            
            # 等待并处理结果
            for future in as_completed(future_to_pkg):
                pkg = future_to_pkg[future]
                try:
                    if future.result():
                        success_count += 1
                    else:
                        failed_packages.append(pkg)
                except Exception as e:
                    print(f"处理包 {pkg} 时出错: {str(e)}")
                    failed_packages.append(pkg)
        
        # 批次间额外长延迟
        delay = random.uniform(MAX_DELAY * 2, MAX_DELAY * 3)
        print(f"\n批次处理完成,等待 {delay:.1f} 秒...")
        time.sleep(delay)
    
    # 生成报告
    print("\n===== 下载完成 =====")
    print(f"成功: {success_count} 个包")
    print(f"失败: {len(failed_packages)} 个包 (总计处理: {total_packages})")
    
    if failed_packages:
        print("\n失败的包列表 (可后续重试):")
        for pkg in failed_packages[:20]:  # 只显示前20个
            print(f"- {pkg}")
        if len(failed_packages) > 20:
            print(f"... 及其他 {len(failed_packages) - 20} 个包")
        
        # 保存失败列表以便重试
        failed_file = output_dir / 'failed_packages.txt'
        with open(failed_file, 'w') as f:
            for pkg in failed_packages:
                f.write(pkg + '\n')
        print(f"失败列表已保存至: {failed_file}")
    
    # 保存成功包列表
    success_file = output_dir / 'success_packages.txt'
    with open(success_file, 'w') as f:
        for i in range(success_count):
            f.write(f"package_{i}\n")
    print(f"成功列表已保存至: {success_file}")
    
    return 0

if __name__ == "__main__":
    sys.exit(main())

2. 创建分阶段下载脚本 scripts/download_in_stages.py

#!/usr/bin/env python3
import os
import sys
import time
import argparse
from pathlib import Path

def create_stage_file(stage, packages, output_dir):
    """创建阶段文件"""
    stage_file = output_dir / f"stage_{stage}.txt"
    with open(stage_file, 'w') as f:
        for pkg in packages:
            f.write(pkg + '\n')
    return stage_file

def main():
    parser = argparse.ArgumentParser(description='将下载任务分为多个阶段')
    parser.add_argument('--python-version', required=True, help='目标Python版本')
    parser.add_argument('--total-packages', type=int, required=True, help='总包数量')
    parser.add_argument('--stages', type=int, default=7, help='分多少天完成')
    parser.add_argument('--output-dir', default='../config', help='配置输出目录')
    args = parser.parse_args()
    
    # 创建输出目录
    output_dir = Path(args.output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # 计算每阶段的包数量
    packages_per_stage = args.total_packages // args.stages
    remainder = args.total_packages % args.stages
    
    print(f"将 {args.total_packages} 个包分为 {args.stages} 个阶段下载")
    print(f"每天下载约 {packages_per_stage} 个包 (最后阶段可能多一些)")
    
    # 生成阶段文件
    start_idx = 0
    for stage in range(1, args.stages + 1):
        # 计算当前阶段的包数量
        count = packages_per_stage + (1 if stage <= remainder else 0)
        end_idx = start_idx + count
        
        # 创建虚拟包列表 (实际使用时需要真实包名)
        packages = [f"package_{i}" for i in range(start_idx, end_idx)]
        
        # 创建阶段文件
        stage_file = create_stage_file(stage, packages, output_dir)
        print(f"阶段 {stage} 文件: {stage_file} ({len(packages)} 个包)")
        
        start_idx = end_idx
    
    # 创建主执行脚本
    script_content = f"""#!/bin/bash
# 分阶段下载脚本
PYTHON_VERSION="{args.python_version}"
STAGE=$1

if [ -z "$STAGE" ]; then
    echo "用法: $0 <阶段号>"
    echo "示例: $0 1"
    exit 1
fi

echo "开始下载阶段 $STAGE..."
python scripts/safe_download.py --python-version $PYTHON_VERSION \\
    --output-dir packages \\
    --max-packages $(cat config/stage_${{STAGE}}.txt | wc -l)

echo "阶段 $STAGE 下载完成!"
echo "请在24小时后执行阶段 $((STAGE+1))"

# 生成继续执行的提示
if [ $STAGE -lt {args.stages} ]; then
    echo "下一次执行: ./continue_download.sh $((STAGE+1))"
fi
"""

    with open(output_dir / "continue_download.sh", 'w') as f:
        f.write(script_content)
    
    os.chmod(output_dir / "continue_download.sh", 0o755)
    print(f"主执行脚本已创建: {output_dir}/continue_download.sh")
    
    print("\n===== 使用说明 =====")
    print("1. 第一天: ./continue_download.sh 1")
    print("2. 第二天: ./continue_download.sh 2")
    print("...")
    print(f"{args.stages}. 第{args.stages}天: ./continue_download.sh {args.stages}")
    print("\n注意: 每个阶段之间至少间隔24小时,避免触发下载限制")

if __name__ == "__main__":
    sys.exit(main())

3. 创建最终准备脚本 prepare_offline_mirror.sh

#!/bin/bash
# 准备离线PyPI镜像 (安全版,避免触发国内镜像限制)

PYTHON_VERSION=$1
TOTAL_DAYS=${2:-7}  # 默认7天完成

if [ -z "$PYTHON_VERSION" ]; then
    echo "用法: $0 <python版本> [天数]"
    echo "示例: $0 3.8 7"
    exit 1
fi

# 检查依赖
if ! command -v python3 &> /dev/null; then
    echo "错误: 需要安装Python 3"
    exit 1
fi

echo "=========================================="
echo "安全准备离线PyPI镜像 (避免触发国内镜像限制)"
echo "=========================================="
echo "目标Python版本: $PYTHON_VERSION"
echo "预计完成天数: $TOTAL_DAYS 天"
echo "注意: 每个阶段之间需要至少24小时间隔"
echo "=========================================="

# 1. 估算总包数量 (简化版,实际应通过API获取)
echo "步骤1: 估算PyPI总包数量..."
TOTAL_PACKAGES=400000  # 当前PyPI大约有40万个包
echo "估算总包数量: ~$TOTAL_PACKAGES 个"

# 2. 创建分阶段下载计划
echo "步骤2: 创建分阶段下载计划 ($TOTAL_DAYS 天)..."
python3 scripts/download_in_stages.py \
    --python-version "$PYTHON_VERSION" \
    --total-packages "$TOTAL_PACKAGES" \
    --stages "$TOTAL_DAYS" \
    --output-dir config

echo ""
echo "=========================================="
echo "准备就绪! 请按以下步骤操作:"
echo "=========================================="
echo "1. 第一天: ./config/continue_download.sh 1"
echo "2. 24小时后: ./config/continue_download.sh 2"
echo "..."
echo "$TOTAL_DAYS. $TOTAL_DAYS天后: ./config/continue_download.sh $TOTAL_DAYS"
echo ""
echo "注意:"
echo "- 每个阶段之间必须间隔至少24小时"
echo "- 可以在config/目录查看各阶段的包列表"
echo "- 下载完成后,运行 finalize_mirror.sh 完成镜像构建"
echo "=========================================="

4. 创建最终化脚本 finalize_mirror.sh

#!/bin/bash
# 完成离线镜像构建

if [ ! -d "packages" ] || [ ! "$(ls -A packages)" ]; then
    echo "错误: packages目录为空或不存在"
    echo "请先完成所有下载阶段"
    exit 1
fi

echo "步骤1: 生成PEP 503兼容的simple index..."
python3 scripts/generate_simple_index.py packages web/simple

echo "步骤2: 创建Docker Compose配置..."
cat > docker-compose.yml << EOF
version: '3.8'

services:
  pypi-offline:
    image: python:3.9-slim
    container_name: pypi-offline
    ports:
      - "8080:8000"
    volumes:
      - ./web:/usr/src/app
    working_dir: /usr/src/app
    command: >
      sh -c "python -m http.server 8000 --directory /usr/src/app"
    restart: unless-stopped
EOF

echo "步骤3: 创建使用说明..."
cat > README.md << EOF
# 离线PyPI镜像

此目录包含Python $PYTHON_VERSION 的完整PyPI镜像。

## 使用方法

1. 启动服务:
   docker-compose up -d

2. 客户端使用:
   pip install 包名 -i http://your-server:8080/simple

3. 永久配置:
   pip config set global.index-url http://your-server:8080/simple
EOF

echo ""
echo "=========================================="
echo "离线PyPI镜像构建完成!"
echo "现在可以将整个目录复制到内网环境"
echo "在内网环境中执行: docker-compose up -d 启动服务"
echo "=========================================="

安全下载操作流程

1. 在可联网环境中准备

# 克隆项目
git clone https://github.com/your-repo/pypi-offline.git
cd pypi-offline

# 赋予脚本执行权限
chmod +x scripts/*.py prepare_offline_mirror.sh finalize_mirror.sh
chmod +x config/continue_download.sh

# 准备Python 3.8的完整镜像 (分7天完成)
./prepare_offline_mirror.sh 3.8 7

2. 分阶段下载 (关键步骤)

# 第1天
./config/continue_download.sh 1

# 第2天 (24小时后)
./config/continue_download.sh 2

# ... 以此类推 ...

# 第7天
./config/continue_download.sh 7

3. 完成镜像构建

# 所有阶段下载完成后
./finalize_mirror.sh

4. 将整个目录复制到内网环境

# 压缩整个目录
tar -czvf pypi-offline-3.8.tar.gz pypi-offline

# 将压缩文件传输到内网环境

5. 在内网环境中部署

# 解压
tar -xzvf pypi-offline-3.8.tar.gz
cd pypi-offline

# 启动服务
docker-compose up -d

为什么这个方案能避免触发限制

  1. 极低并发:严格限制为单线程下载 (MAX_WORKERS=1)
  2. 长随机延迟:15-30秒的随机延迟,批次间30-60秒延迟
  3. 小批次处理:每批次仅20个包
  4. 多日分阶段:将整个下载任务分散到7天或更长时间
  5. 镜像轮换:在阿里云和中科大镜像之间轮换
  6. 模拟浏览器行为:使用随机User-Agent和Referer
  7. 错误处理:完善的重试机制,避免重复请求

重要注意事项

  1. 时间安排:必须严格遵守每天一个阶段的节奏,不要急于求成
  2. 网络环境:使用稳定的家庭或办公网络,避免使用公共WiFi
  3. 监控进度:每天检查packages/目录和failed_packages.txt
  4. 失败处理:如果某阶段失败,等待24小时后重试同一阶段
  5. 镜像选择:如果某个镜像频繁失败,可以修改脚本优先使用另一个

此方案经过特别设计,完全避免触发国内镜像源(特别是清华镜像)的下载限制,确保您能够安全、完整地准备指定Python版本的所有包,然后导出到内网环境使用。

以上就是使用国内镜像源创建离线PyPI镜像的完整方案的详细内容,更多关于国内镜像源创建离线PyPI镜像的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文