python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python打包到PyPI

Python一次性将指定版本所有包上传PyPI镜像解决方案

作者:东方佑

本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随小编一起学习一下

根据您的需求和知识库中的警告信息(清华镜像会阻断大量下载行为),我设计了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境。

为什么需要这个方案

知识库明确显示:清华镜像会检测到大量下载行为并阻断请求。直接尝试下载所有包会触发此限制,导致下载失败。我们需要一个谨慎、分阶段、有间隔的下载策略。

完整解决方案

1. 项目目录结构

mkdir -p pypi-offline/{config,scripts,packages}
cd pypi-offline

2. 创建智能下载脚本

scripts/safe_download.py

#!/usr/bin/env python3
import os
import sys
import time
import random
import requests
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

# 配置参数
MAX_RETRIES = 3
MIN_DELAY = 5  # 最小延迟(秒)
MAX_DELAY = 15  # 最大延迟(秒)
MAX_WORKERS = 2  # 最大并发数(避免触发限制)

# PyPI API端点
PYPI_SIMPLE = "https://pypi.org/simple"
# 使用官方PyPI而非清华镜像,避免触发限制
# 如果网络条件好,可考虑使用其他镜像

def get_package_info(package_name):
    """获取包的元信息"""
    url = f"https://pypi.org/pypi/{package_name}/json"
    for i in range(MAX_RETRIES):
        try:
            response = requests.get(url, timeout=30)
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 404:
                return None
            time.sleep((i + 1) * 2)
        except Exception as e:
            print(f"获取包 {package_name} 信息失败: {str(e)}")
            time.sleep((i + 1) * 5)
    return None

def filter_packages_for_python(package_data, python_version):
    """过滤出兼容指定Python版本的包"""
    compatible_files = []
    py_ver = python_version.replace('.', '')
    
    for file_info in package_data['urls']:
        # 检查Python版本兼容性
        py_tag = file_info.get('python_version', '')
        if py_tag == 'source' or py_tag.startswith('py') or py_tag.startswith('cp' + py_ver):
            compatible_files.append(file_info)
    
    return compatible_files

def download_package(package_name, file_info, target_dir):
    """安全下载单个包文件"""
    file_url = file_info['url']
    file_name = file_info['filename']
    target_path = os.path.join(target_dir, file_name)
    
    # 如果文件已存在,跳过
    if os.path.exists(target_path):
        print(f"跳过已存在的文件: {file_name}")
        return True
    
    print(f"下载: {file_name} ({package_name})")
    
    for i in range(MAX_RETRIES):
        try:
            response = requests.get(file_url, stream=True, timeout=60)
            if response.status_code == 200:
                with open(target_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                print(f"成功下载: {file_name}")
                return True
            print(f"下载失败 ({response.status_code}): {file_name}")
            time.sleep((i + 1) * 5)
        except Exception as e:
            print(f"下载 {file_name} 失败: {str(e)}")
            time.sleep((i + 1) * 5)
    
    # 下载失败,删除可能的部分文件
    if os.path.exists(target_path):
        os.remove(target_path)
    return False

def get_all_packages():
    """获取所有包的列表"""
    print("获取所有包的列表...")
    response = requests.get(f"{PYPI_SIMPLE}/", timeout=30)
    if response.status_code != 200:
        raise Exception(f"无法获取包列表: HTTP {response.status_code}")
    
    # 解析HTML获取包名
    import re
    package_names = re.findall(r'<a href="/simple/([^" rel="external nofollow" /]+)">\1</a>', response.text)
    return package_names

def main():
    parser = argparse.ArgumentParser(description='安全下载指定Python版本的所有包')
    parser.add_argument('--python-version', required=True, help='目标Python版本 (如: 3.8)')
    parser.add_argument('--output-dir', default='../packages', help='输出目录')
    args = parser.parse_args()
    
    # 创建输出目录
    output_dir = Path(args.output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    print(f"输出目录: {output_dir}")
    
    # 获取所有包列表
    try:
        all_packages = get_all_packages()
        print(f"找到 {len(all_packages)} 个包")
    except Exception as e:
        print(f"获取包列表失败: {str(e)}")
        print("请尝试使用清华镜像的简单页面 API (需要处理 HTML)")
        # 作为备选方案,可以使用清华镜像的简单页面
        # all_packages = get_packages_from_tuna()
        return 1
    
    # 处理包
    success_count = 0
    failed_packages = []
    
    # 分批次处理,避免一次性太多请求
    batch_size = 50
    for i in range(0, len(all_packages), batch_size):
        batch = all_packages[i:i+batch_size]
        print(f"\n处理包批次 {i//batch_size + 1} ({len(batch)} 个包)")
        
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            future_to_pkg = {}
            
            for pkg in batch:
                pkg_data = get_package_info(pkg)
                if pkg_data:
                    compatible_files = filter_packages_for_python(pkg_data, args.python_version)
                    if compatible_files:
                        for file_info in compatible_files:
                            future = executor.submit(
                                download_package, 
                                pkg, 
                                file_info, 
                                output_dir
                            )
                            future_to_pkg[future] = pkg
                # 添加随机延迟,避免触发限制
                time.sleep(random.uniform(MIN_DELAY, MAX_DELAY))
            
            # 等待并处理结果
            for future in as_completed(future_to_pkg):
                pkg = future_to_pkg[future]
                try:
                    if future.result():
                        success_count += 1
                    else:
                        failed_packages.append(pkg)
                except Exception as e:
                    print(f"处理包 {pkg} 时出错: {str(e)}")
                    failed_packages.append(pkg)
        
        # 批次间额外延迟
        print(f"\n批次处理完成,等待 {MAX_DELAY*2} 秒...")
        time.sleep(MAX_DELAY * 2)
    
    # 生成报告
    print("\n===== 下载完成 =====")
    print(f"成功: {success_count} 个包")
    print(f"失败: {len(failed_packages)} 个包")
    
    if failed_packages:
        print("\n失败的包列表 (可后续重试):")
        for pkg in failed_packages[:20]:  # 只显示前20个
            print(f"- {pkg}")
        if len(failed_packages) > 20:
            print(f"... 及其他 {len(failed_packages) - 20} 个包")
        
        # 保存失败列表以便重试
        with open(output_dir / 'failed_packages.txt', 'w') as f:
            for pkg in failed_packages:
                f.write(pkg + '\n')
    
    return 0

if __name__ == "__main__":
    sys.exit(main())

3. 创建包清单生成脚本

scripts/generate_simple_index.py

#!/usr/bin/env python3
import os
import sys
from pathlib import Path
import re
import html

def generate_simple_index(packages_dir, output_dir):
    """生成符合PEP 503的simple index"""
    packages_dir = Path(packages_dir)
    output_dir = Path(output_dir)
    
    # 创建输出目录
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # 收集所有包名
    package_names = set()
    for filename in os.listdir(packages_dir):
        # 从文件名提取包名 (PEP 427 格式)
        match = re.match(r'^([a-zA-Z0-9_-]+)(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename)
        if match:
            package_name = match.group(1).lower()
            # 规范化包名 (PEP 503)
            package_name = package_name.replace('_', '-')
            package_names.add(package_name)
    
    # 为每个包生成索引页面
    for package in package_names:
        index_content = f'<html><head><title>Links for {package}</title></head>\n'
        index_content += f'<body>\n<h1>Links for {package}</h1>\n'
        
        # 找出该包的所有文件
        for filename in os.listdir(packages_dir):
            if re.match(f'^{re.escape(package)}(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename, re.IGNORECASE):
                file_url = f'../{filename}'
                index_content += f'<a href="{file_url}" rel="external nofollow" >{html.escape(filename)}</a>
\n'
        
        index_content += '</body></html>'
        
        # 保存索引页面
        package_dir = output_dir / package
        package_dir.mkdir(exist_ok=True)
        with open(package_dir / 'index.html', 'w', encoding='utf-8') as f:
            f.write(index_content)
    
    # 生成根索引页面
    root_index = '<html><head><title>Simple Index</title></head>\n'
    root_index += '<body>\n<h1>Simple Index</h1>\n'
    
    for package in sorted(package_names):
        root_index += f'<a href="{package}/" rel="external nofollow" >{html.escape(package)}</a>
\n'
    
    root_index += '</body></html>'
    
    with open(output_dir / 'index.html', 'w', encoding='utf-8') as f:
        f.write(root_index)
    
    print(f"成功生成 {len(package_names)} 个包的索引")

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("用法: python generate_simple_index.py <packages_dir> <output_dir>")
        sys.exit(1)
    
    generate_simple_index(sys.argv[1], sys.argv[2])

4. 创建 Docker Compose 配置docker-compose.yml

version: '3.8'

services:
  pypi-offline:
    image: python:3.9-slim
    container_name: pypi-offline
    ports:
      - "8080:8000"
    volumes:
      - ./web:/usr/src/app
      - ./packages:/usr/src/app/packages
    working_dir: /usr/src/app
    command: >
      sh -c "python -m http.server 8000 --directory /usr/src/app"
    restart: unless-stopped

5. 创建 Web 服务目录结构

mkdir -p web/{simple,packages}

6. 创建准备脚本

prepare_offline_mirror.sh

#!/bin/bash
# 准备离线PyPI镜像

PYTHON_VERSION=$1
if [ -z "$PYTHON_VERSION" ]; then
    echo "用法: $0 <python版本>"
    echo "示例: $0 3.8"
    exit 1
fi

# 检查依赖
if ! command -v python3 &> /dev/null; then
    echo "错误: 需要安装Python 3"
    exit 1
fi

# 1. 安全下载所有兼容指定Python版本的包
echo "步骤1: 安全下载所有兼容Python $PYTHON_VERSION 的包"
python3 scripts/safe_download.py --python-version "$PYTHON_VERSION" --output-dir packages

# 2. 生成simple index
echo "步骤2: 生成PEP 503兼容的simple index"
python3 scripts/generate_simple_index.py packages web/simple

# 3. 创建README
cat > web/README.md << EOF
# 离线PyPI镜像

此目录包含Python $PYTHON_VERSION 的完整PyPI镜像。

## 使用方法

1. 启动服务:
   docker-compose up -d

2. 客户端使用:
   pip install 包名 -i http://your-server:8080/simple
EOF

echo "离线PyPI镜像准备完成!"
echo "现在可以将整个pypi-offline目录复制到内网环境"
echo "在内网环境中执行: docker-compose up -d 启动服务"

完整操作流程

1. 在可联网的环境中准备镜像

# 克隆项目
git clone https://github.com/your-repo/pypi-offline.git
cd pypi-offline

# 赋予脚本执行权限
chmod +x scripts/*.py prepare_offline_mirror.sh

# 准备Python 3.8的完整镜像 (这将需要较长时间)
./prepare_offline_mirror.sh 3.8

# 准备Python 3.9的完整镜像 (可选)
# ./prepare_offline_mirror.sh 3.9

2. 验证下载结果

# 检查包数量
ls packages | wc -l

# 检查simple index
ls web/simple | wc -l

3. 将整个目录复制到内网环境

# 压缩整个目录
tar -czvf pypi-offline-3.8.tar.gz pypi-offline

# 将压缩文件传输到内网环境

4. 在内网环境中部署

# 解压
tar -xzvf pypi-offline-3.8.tar.gz
cd pypi-offline

# 启动服务
docker-compose up -d

5. 内网客户端使用

# 临时使用
pip install numpy -i http://pypi-offline-server:8080/simple

# 永久配置
pip config set global.index-url http://pypi-offline-server:8080/simple

方案优势

1.避免触发下载限制:

2.完整镜像:

3.轻量级部署:

4.可验证性:

注意事项

此方案确保您可以安全地一次性准备完整镜像,然后完全离线使用,完美符合您的内网部署需求。

以上就是Python一次性将指定版本所有包上传PyPI镜像解决方案的详细内容,更多关于Python打包到PyPI的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文