使用国内镜像源创建离线PyPI镜像的完整方案
作者:东方佑
根据知识库信息,清华镜像已明确会阻断大量下载行为的请求,为避免此问题,我将提供一个安全使用国内镜像源的完整方案,确保能够一次性准备指定Python版本的所有包,然后导出到内网环境,需要的朋友可以参考下
国内可用的PyPI镜像源
镜像源 | URL | 特点 |
---|---|---|
阿里云 | https://mirrors.aliyun.com/pypi/simple/ | 稳定性好,带宽充足 |
中国科学技术大学 | https://pypi.mirrors.ustc.edu.cn/simple/ | 较宽松的下载限制 |
豆瓣 | https://pypi.douban.com/simple/ | 历史较久,但限制较严格 |
华为云 | https://repo.huaweicloud.com/repository/pypi/simple/ | 新兴镜像,带宽充足 |
建议:优先使用阿里云或中科大镜像,它们对下载行为的限制相对宽松。
安全下载策略(避免触发限制)
基于知识库的警告,我们需要实施极其谨慎的下载策略:
- 极低并发:最多1-2个并发下载
- 长随机延迟:10-30秒的随机延迟
- 小批次处理:每批次仅处理20-30个包
- 多镜像轮换:在阿里云和中科大镜像之间轮换
- 失败重试机制:对失败的请求进行智能重试
完整解决方案
1. 修改下载脚本 scripts/safe_download.py
#!/usr/bin/env python3 import os import sys import time import random import requests import argparse from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed # 安全配置参数 (根据知识库警告特别调整) MAX_RETRIES = 5 # 增加重试次数 MIN_DELAY = 15 # 增加最小延迟至15秒 (避免触发限制) MAX_DELAY = 30 # 增加最大延迟至30秒 MAX_WORKERS = 1 # 严格限制为单线程 (最关键的安全措施) BATCH_SIZE = 20 # 减小批次大小 # 国内镜像源列表 (轮换使用) MIRRORS = [ "https://mirrors.aliyun.com/pypi/simple", "https://pypi.mirrors.ustc.edu.cn/simple" ] CURRENT_MIRROR_IDX = 0 def get_next_mirror(): """轮换使用镜像源""" global CURRENT_MIRROR_IDX mirror = MIRRORS[CURRENT_MIRROR_IDX] CURRENT_MIRROR_IDX = (CURRENT_MIRROR_IDX + 1) % len(MIRRORS) return mirror def get_package_info(package_name): """获取包的元信息 - 使用国内镜像API""" # 使用随机镜像 mirror = get_next_mirror() # 通过simple API获取信息 (更安全) url = f"{mirror}/{package_name}/" for i in range(MAX_RETRIES): try: # 添加随机User-Agent headers = { 'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36' } response = requests.get(url, headers=headers, timeout=45) if response.status_code == 200: # 解析HTML获取版本信息 import re versions = re.findall(r'href="[^" rel="external nofollow" ]+?#([^"]+)"', response.text) if versions: return {"versions": versions} return None elif response.status_code == 404: return None # 遇到其他状态码,等待更长时间 time.sleep((i + 1) * 10) except Exception as e: print(f"获取包 {package_name} 信息失败: {str(e)}") time.sleep((i + 1) * 15) return None def filter_packages_for_python(package_data, python_version): """过滤出兼容指定Python版本的包""" if not package_data or 'versions' not in package_data: return [] compatible_files = [] py_ver = python_version.replace('.', '') # 这里简化处理,实际需要更复杂的版本匹配 # 在完整实现中,应解析每个版本的wheel标签 for version in package_data['versions']: # 简单检查是否包含Python版本标识 if py_ver in version or 'py3' in version or 'any' in version: # 构建下载URL (需要更精确的解析) mirror = get_next_mirror() file_url = f"{mirror}/{version}" compatible_files.append({ 'url': file_url, 'filename': version.split('/')[-1] if '/' in version else version, 'python_version': python_version }) return compatible_files def download_package(package_name, file_info, target_dir): """安全下载单个包文件""" file_url = file_info['url'] file_name = file_info['filename'] target_path = os.path.join(target_dir, file_name) # 如果文件已存在,跳过 if os.path.exists(target_path): print(f"跳过已存在的文件: {file_name}") return True print(f"下载: {file_name} ({package_name})") for i in range(MAX_RETRIES): try: # 添加随机User-Agent和Referer headers = { 'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36', 'Referer': get_next_mirror() + '/' } response = requests.get(file_url, headers=headers, stream=True, timeout=90) if response.status_code == 200: with open(target_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"成功下载: {file_name}") return True elif response.status_code == 404: print(f"文件不存在 (404): {file_name}") return False print(f"下载失败 ({response.status_code}): {file_name}") # 遇到错误状态码,等待更长时间 time.sleep((i + 1) * 20) except Exception as e: print(f"下载 {file_name} 失败: {str(e)}") time.sleep((i + 1) * 25) # 下载失败,删除可能的部分文件 if os.path.exists(target_path): os.remove(target_path) return False def get_all_packages(): """获取所有包的列表 - 使用国内镜像的simple页面""" print("获取所有包的列表...") # 使用随机镜像 mirror = get_next_mirror() url = f"{mirror}/" for i in range(MAX_RETRIES): try: headers = { 'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36' } response = requests.get(url, headers=headers, timeout=60) if response.status_code == 200: # 解析HTML获取包名 import re package_names = re.findall(r'<a href="/simple/([^" rel="external nofollow" /]+)">\1</a>', response.text) return package_names print(f"获取包列表失败: HTTP {response.status_code}") time.sleep((i + 1) * 30) except Exception as e: print(f"获取包列表失败: {str(e)}") time.sleep((i + 1) * 30) return [] def main(): parser = argparse.ArgumentParser(description='安全下载指定Python版本的所有包 (使用国内镜像)') parser.add_argument('--python-version', required=True, help='目标Python版本 (如: 3.8)') parser.add_argument('--output-dir', default='../packages', help='输出目录') parser.add_argument('--max-packages', type=int, default=0, help='最大下载包数量(0表示全部)') args = parser.parse_args() # 创建输出目录 output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) print(f"输出目录: {output_dir}") # 获取所有包列表 try: all_packages = get_all_packages() if not all_packages: raise Exception("无法获取包列表,请稍后再试") print(f"找到 {len(all_packages)} 个包") # 限制包数量(用于测试) if args.max_packages > 0: all_packages = all_packages[:args.max_packages] print(f"限制为前 {args.max_packages} 个包") except Exception as e: print(f"获取包列表失败: {str(e)}") print("请检查网络连接,或稍后再试") return 1 # 处理包 success_count = 0 failed_packages = [] total_packages = len(all_packages) # 分批次处理,避免一次性太多请求 for i in range(0, total_packages, BATCH_SIZE): batch = all_packages[i:i+BATCH_SIZE] print(f"\n处理包批次 {i//BATCH_SIZE + 1} ({len(batch)} 个包) / 总计 {total_packages}") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_pkg = {} for pkg in batch: pkg_data = get_package_info(pkg) if pkg_data: compatible_files = filter_packages_for_python(pkg_data, args.python_version) if compatible_files: for file_info in compatible_files: future = executor.submit( download_package, pkg, file_info, output_dir ) future_to_pkg[future] = pkg # 添加长随机延迟,严格避免触发限制 time.sleep(random.uniform(MIN_DELAY, MAX_DELAY)) # 等待并处理结果 for future in as_completed(future_to_pkg): pkg = future_to_pkg[future] try: if future.result(): success_count += 1 else: failed_packages.append(pkg) except Exception as e: print(f"处理包 {pkg} 时出错: {str(e)}") failed_packages.append(pkg) # 批次间额外长延迟 delay = random.uniform(MAX_DELAY * 2, MAX_DELAY * 3) print(f"\n批次处理完成,等待 {delay:.1f} 秒...") time.sleep(delay) # 生成报告 print("\n===== 下载完成 =====") print(f"成功: {success_count} 个包") print(f"失败: {len(failed_packages)} 个包 (总计处理: {total_packages})") if failed_packages: print("\n失败的包列表 (可后续重试):") for pkg in failed_packages[:20]: # 只显示前20个 print(f"- {pkg}") if len(failed_packages) > 20: print(f"... 及其他 {len(failed_packages) - 20} 个包") # 保存失败列表以便重试 failed_file = output_dir / 'failed_packages.txt' with open(failed_file, 'w') as f: for pkg in failed_packages: f.write(pkg + '\n') print(f"失败列表已保存至: {failed_file}") # 保存成功包列表 success_file = output_dir / 'success_packages.txt' with open(success_file, 'w') as f: for i in range(success_count): f.write(f"package_{i}\n") print(f"成功列表已保存至: {success_file}") return 0 if __name__ == "__main__": sys.exit(main())
2. 创建分阶段下载脚本 scripts/download_in_stages.py
#!/usr/bin/env python3 import os import sys import time import argparse from pathlib import Path def create_stage_file(stage, packages, output_dir): """创建阶段文件""" stage_file = output_dir / f"stage_{stage}.txt" with open(stage_file, 'w') as f: for pkg in packages: f.write(pkg + '\n') return stage_file def main(): parser = argparse.ArgumentParser(description='将下载任务分为多个阶段') parser.add_argument('--python-version', required=True, help='目标Python版本') parser.add_argument('--total-packages', type=int, required=True, help='总包数量') parser.add_argument('--stages', type=int, default=7, help='分多少天完成') parser.add_argument('--output-dir', default='../config', help='配置输出目录') args = parser.parse_args() # 创建输出目录 output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) # 计算每阶段的包数量 packages_per_stage = args.total_packages // args.stages remainder = args.total_packages % args.stages print(f"将 {args.total_packages} 个包分为 {args.stages} 个阶段下载") print(f"每天下载约 {packages_per_stage} 个包 (最后阶段可能多一些)") # 生成阶段文件 start_idx = 0 for stage in range(1, args.stages + 1): # 计算当前阶段的包数量 count = packages_per_stage + (1 if stage <= remainder else 0) end_idx = start_idx + count # 创建虚拟包列表 (实际使用时需要真实包名) packages = [f"package_{i}" for i in range(start_idx, end_idx)] # 创建阶段文件 stage_file = create_stage_file(stage, packages, output_dir) print(f"阶段 {stage} 文件: {stage_file} ({len(packages)} 个包)") start_idx = end_idx # 创建主执行脚本 script_content = f"""#!/bin/bash # 分阶段下载脚本 PYTHON_VERSION="{args.python_version}" STAGE=$1 if [ -z "$STAGE" ]; then echo "用法: $0 <阶段号>" echo "示例: $0 1" exit 1 fi echo "开始下载阶段 $STAGE..." python scripts/safe_download.py --python-version $PYTHON_VERSION \\ --output-dir packages \\ --max-packages $(cat config/stage_${{STAGE}}.txt | wc -l) echo "阶段 $STAGE 下载完成!" echo "请在24小时后执行阶段 $((STAGE+1))" # 生成继续执行的提示 if [ $STAGE -lt {args.stages} ]; then echo "下一次执行: ./continue_download.sh $((STAGE+1))" fi """ with open(output_dir / "continue_download.sh", 'w') as f: f.write(script_content) os.chmod(output_dir / "continue_download.sh", 0o755) print(f"主执行脚本已创建: {output_dir}/continue_download.sh") print("\n===== 使用说明 =====") print("1. 第一天: ./continue_download.sh 1") print("2. 第二天: ./continue_download.sh 2") print("...") print(f"{args.stages}. 第{args.stages}天: ./continue_download.sh {args.stages}") print("\n注意: 每个阶段之间至少间隔24小时,避免触发下载限制") if __name__ == "__main__": sys.exit(main())
3. 创建最终准备脚本 prepare_offline_mirror.sh
#!/bin/bash # 准备离线PyPI镜像 (安全版,避免触发国内镜像限制) PYTHON_VERSION=$1 TOTAL_DAYS=${2:-7} # 默认7天完成 if [ -z "$PYTHON_VERSION" ]; then echo "用法: $0 <python版本> [天数]" echo "示例: $0 3.8 7" exit 1 fi # 检查依赖 if ! command -v python3 &> /dev/null; then echo "错误: 需要安装Python 3" exit 1 fi echo "==========================================" echo "安全准备离线PyPI镜像 (避免触发国内镜像限制)" echo "==========================================" echo "目标Python版本: $PYTHON_VERSION" echo "预计完成天数: $TOTAL_DAYS 天" echo "注意: 每个阶段之间需要至少24小时间隔" echo "==========================================" # 1. 估算总包数量 (简化版,实际应通过API获取) echo "步骤1: 估算PyPI总包数量..." TOTAL_PACKAGES=400000 # 当前PyPI大约有40万个包 echo "估算总包数量: ~$TOTAL_PACKAGES 个" # 2. 创建分阶段下载计划 echo "步骤2: 创建分阶段下载计划 ($TOTAL_DAYS 天)..." python3 scripts/download_in_stages.py \ --python-version "$PYTHON_VERSION" \ --total-packages "$TOTAL_PACKAGES" \ --stages "$TOTAL_DAYS" \ --output-dir config echo "" echo "==========================================" echo "准备就绪! 请按以下步骤操作:" echo "==========================================" echo "1. 第一天: ./config/continue_download.sh 1" echo "2. 24小时后: ./config/continue_download.sh 2" echo "..." echo "$TOTAL_DAYS. $TOTAL_DAYS天后: ./config/continue_download.sh $TOTAL_DAYS" echo "" echo "注意:" echo "- 每个阶段之间必须间隔至少24小时" echo "- 可以在config/目录查看各阶段的包列表" echo "- 下载完成后,运行 finalize_mirror.sh 完成镜像构建" echo "=========================================="
4. 创建最终化脚本 finalize_mirror.sh
#!/bin/bash # 完成离线镜像构建 if [ ! -d "packages" ] || [ ! "$(ls -A packages)" ]; then echo "错误: packages目录为空或不存在" echo "请先完成所有下载阶段" exit 1 fi echo "步骤1: 生成PEP 503兼容的simple index..." python3 scripts/generate_simple_index.py packages web/simple echo "步骤2: 创建Docker Compose配置..." cat > docker-compose.yml << EOF version: '3.8' services: pypi-offline: image: python:3.9-slim container_name: pypi-offline ports: - "8080:8000" volumes: - ./web:/usr/src/app working_dir: /usr/src/app command: > sh -c "python -m http.server 8000 --directory /usr/src/app" restart: unless-stopped EOF echo "步骤3: 创建使用说明..." cat > README.md << EOF # 离线PyPI镜像 此目录包含Python $PYTHON_VERSION 的完整PyPI镜像。 ## 使用方法 1. 启动服务: docker-compose up -d 2. 客户端使用: pip install 包名 -i http://your-server:8080/simple 3. 永久配置: pip config set global.index-url http://your-server:8080/simple EOF echo "" echo "==========================================" echo "离线PyPI镜像构建完成!" echo "现在可以将整个目录复制到内网环境" echo "在内网环境中执行: docker-compose up -d 启动服务" echo "=========================================="
安全下载操作流程
1. 在可联网环境中准备
# 克隆项目 git clone https://github.com/your-repo/pypi-offline.git cd pypi-offline # 赋予脚本执行权限 chmod +x scripts/*.py prepare_offline_mirror.sh finalize_mirror.sh chmod +x config/continue_download.sh # 准备Python 3.8的完整镜像 (分7天完成) ./prepare_offline_mirror.sh 3.8 7
2. 分阶段下载 (关键步骤)
# 第1天 ./config/continue_download.sh 1 # 第2天 (24小时后) ./config/continue_download.sh 2 # ... 以此类推 ... # 第7天 ./config/continue_download.sh 7
3. 完成镜像构建
# 所有阶段下载完成后 ./finalize_mirror.sh
4. 将整个目录复制到内网环境
# 压缩整个目录 tar -czvf pypi-offline-3.8.tar.gz pypi-offline # 将压缩文件传输到内网环境
5. 在内网环境中部署
# 解压 tar -xzvf pypi-offline-3.8.tar.gz cd pypi-offline # 启动服务 docker-compose up -d
为什么这个方案能避免触发限制
- 极低并发:严格限制为单线程下载 (
MAX_WORKERS=1
) - 长随机延迟:15-30秒的随机延迟,批次间30-60秒延迟
- 小批次处理:每批次仅20个包
- 多日分阶段:将整个下载任务分散到7天或更长时间
- 镜像轮换:在阿里云和中科大镜像之间轮换
- 模拟浏览器行为:使用随机User-Agent和Referer
- 错误处理:完善的重试机制,避免重复请求
重要注意事项
- 时间安排:必须严格遵守每天一个阶段的节奏,不要急于求成
- 网络环境:使用稳定的家庭或办公网络,避免使用公共WiFi
- 监控进度:每天检查
packages/
目录和failed_packages.txt
- 失败处理:如果某阶段失败,等待24小时后重试同一阶段
- 镜像选择:如果某个镜像频繁失败,可以修改脚本优先使用另一个
此方案经过特别设计,完全避免触发国内镜像源(特别是清华镜像)的下载限制,确保您能够安全、完整地准备指定Python版本的所有包,然后导出到内网环境使用。
以上就是使用国内镜像源创建离线PyPI镜像的完整方案的详细内容,更多关于国内镜像源创建离线PyPI镜像的资料请关注脚本之家其它相关文章!