python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python读写压缩文件

Python中读写压缩数据文件的方法完全指南

作者:Python×CATIA工业智造

在现代数据密集型应用中,压缩文件处理是每个Python开发者必须掌握的关键技能,本文将深入解析Python压缩文件读写的相关方法,有需要的小伙伴可以了解下

引言:压缩数据处理的核心价值

在现代数据密集型应用中,压缩文件处理是每个Python开发者必须掌握的关键技能。根据2024年数据工程报告显示:

Python标准库提供了全面的压缩文件处理支持,但许多开发者未能充分利用其全部功能。本文将深入解析Python压缩文件读写技术体系,结合工程实践,拓展性能优化、并发处理、错误恢复等高级应用场景。

一、基础压缩文件操作

1.1 GZIP格式读写基础

import gzip
import shutil

def basic_gzip_operations():
    """基础GZIP文件操作"""
    # 创建测试数据
    original_data = "这是原始数据内容\n" * 1000
    print(f"原始数据大小: {len(original_data)} 字节")
    
    # 写入GZIP文件
    with gzip.open('example.gz', 'wt', encoding='utf-8') as f:
        f.write(original_data)
    print("GZIP文件写入完成")
    
    # 读取GZIP文件
    with gzip.open('example.gz', 'rt', encoding='utf-8') as f:
        decompressed_data = f.read()
    
    print(f"解压后数据大小: {len(decompressed_data)} 字节")
    print(f"数据一致性: {original_data == decompressed_data}")
    
    # 检查压缩文件信息
    compressed_size = os.path.getsize('example.gz')
    compression_ratio = len(original_data) / compressed_size
    print(f"压缩文件大小: {compressed_size} 字节")
    print(f"压缩比: {compression_ratio:.2f}:1")
    
    # 二进制模式读写
    binary_data = original_data.encode('utf-8')
    with gzip.open('binary_example.gz', 'wb') as f:
        f.write(binary_data)
    
    with gzip.open('binary_example.gz', 'rb') as f:
        restored_binary = f.read()
        restored_text = restored_binary.decode('utf-8')
    
    print(f"二进制模式一致性: {original_data == restored_text}")

# 执行示例
basic_gzip_operations()

1.2 多格式压缩支持

def multiple_compression_formats():
    """多格式压缩文件操作"""
    import bz2
    import lzma
    
    test_data = "测试数据内容" * 500
    print(f"测试数据大小: {len(test_data)} 字节")
    
    # 定义压缩格式处理器
    compressors = {
        'gzip': {
            'module': gzip,
            'extension': '.gz',
            'description': 'GZIP格式'
        },
        'bzip2': {
            'module': bz2,
            'extension': '.bz2',
            'description': 'BZIP2格式'
        },
        'lzma': {
            'module': lzma,
            'extension': '.xz',
            'description': 'LZMA格式'
        }
    }
    
    results = {}
    
    for name, config in compressors.items():
        # 写入压缩文件
        filename = f'example{config["extension"]}'
        
        with config['module'].open(filename, 'wt', encoding='utf-8') as f:
            f.write(test_data)
        
        # 读取并验证
        with config['module'].open(filename, 'rt', encoding='utf-8') as f:
            decompressed = f.read()
        
        compressed_size = os.path.getsize(filename)
        ratio = len(test_data) / compressed_size
        
        results[name] = {
            'compressed_size': compressed_size,
            'ratio': ratio,
            'consistent': test_data == decompressed
        }
        
        print(f"{config['description']}:")
        print(f"  压缩大小: {compressed_size} 字节")
        print(f"  压缩比: {ratio:.2f}:1")
        print(f"  数据一致: {test_data == decompressed}")
    
    # 性能比较
    best_compression = max(results.items(), key=lambda x: x[1]['ratio'])
    print(f"\n最佳压缩: {best_compression[0]} (压缩比 {best_compression[1]['ratio']:.2f}:1)")
    
    # 清理文件
    for config in compressors.values():
        filename = f'example{config["extension"]}'
        if os.path.exists(filename):
            os.remove(filename)

# 执行示例
multiple_compression_formats()

二、高级压缩技术

2.1 压缩级别与性能调优

def compression_level_tuning():
    """压缩级别性能调优"""
    # 生成测试数据
    large_data = "重复数据压缩测试\n" * 10000
    binary_data = large_data.encode('utf-8')
    
    print(f"原始数据大小: {len(binary_data)} 字节")
    
    # 测试不同压缩级别
    compression_levels = [1, 6, 9]  # 1=最快, 6=默认, 9=最佳压缩
    
    results = []
    
    for level in compression_levels:
        start_time = time.time()
        
        # 使用指定压缩级别
        with gzip.open(f'level_{level}.gz', 'wb', compresslevel=level) as f:
            f.write(binary_data)
        
        compress_time = time.time() - start_time
        compressed_size = os.path.getsize(f'level_{level}.gz')
        ratio = len(binary_data) / compressed_size
        
        results.append({
            'level': level,
            'size': compressed_size,
            'ratio': ratio,
            'time': compress_time
        })
        
        print(f"级别 {level}: {compressed_size} 字节, 压缩比 {ratio:.2f}:1, 耗时 {compress_time:.3f}秒")
    
    # 绘制性能图表
    import matplotlib.pyplot as plt
    
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
    
    # 压缩比图表
    levels = [r['level'] for r in results]
    ratios = [r['ratio'] for r in results]
    ax1.bar(levels, ratios, color='skyblue')
    ax1.set_xlabel('压缩级别')
    ax1.set_ylabel('压缩比')
    ax1.set_title('压缩级别 vs 压缩比')
    
    # 耗时图表
    times = [r['time'] for r in results]
    ax2.bar(levels, times, color='lightcoral')
    ax2.set_xlabel('压缩级别')
    ax2.set_ylabel('耗时 (秒)')
    ax2.set_title('压缩级别 vs 耗时')
    
    plt.tight_layout()
    plt.savefig('compression_performance.png')
    print("性能图表已保存为 compression_performance.png")
    
    # 推荐策略
    best_ratio = max(results, key=lambda x: x['ratio'])
    best_speed = min(results, key=lambda x: x['time'])
    
    print(f"\n最佳压缩: 级别 {best_ratio['level']} (压缩比 {best_ratio['ratio']:.2f}:1)")
    print(f"最快压缩: 级别 {best_speed['level']} (耗时 {best_speed['time']:.3f}秒)")
    
    # 清理文件
    for level in compression_levels:
        filename = f'level_{level}.gz'
        if os.path.exists(filename):
            os.remove(filename)

# 执行示例
compression_level_tuning()

2.2 流式压缩处理

def streaming_compression():
    """流式压缩处理大型数据"""
    def generate_large_data(num_records=100000):
        """生成大型测试数据"""
        for i in range(num_records):
            yield f"记录 {i}: 这是测试数据内容 " * 5 + "\n"
    
    # 流式压缩写入
    def stream_compress(filename, data_generator, compression_class=gzip):
        """流式压缩数据"""
        with compression_class.open(filename, 'wt', encoding='utf-8') as f:
            for record in data_generator:
                f.write(record)
                if f.tell() % 1000000 < len(record):  # 每约1MB输出进度
                    print(f"已写入 {f.tell()} 字节")
    
    # 流式解压读取
    def stream_decompress(filename, compression_class=gzip):
        """流式解压数据"""
        with compression_class.open(filename, 'rt', encoding='utf-8') as f:
            for line in f:
                yield line
    
    # 测试流式处理
    print("开始流式压缩...")
    start_time = time.time()
    
    # 流式压缩
    stream_compress('stream_data.gz', generate_large_data(50000))
    compress_time = time.time() - start_time
    
    # 获取压缩文件信息
    compressed_size = os.path.getsize('stream_data.gz')
    print(f"压缩完成: {compressed_size} 字节, 耗时 {compress_time:.2f}秒")
    
    # 流式解压和处理
    print("开始流式解压和处理...")
    start_time = time.time()
    
    record_count = 0
    for line in stream_decompress('stream_data.gz'):
        record_count += 1
        # 模拟数据处理
        if record_count % 10000 == 0:
            print(f"已处理 {record_count} 条记录")
    
    decompress_time = time.time() - start_time
    print(f"解压完成: {record_count} 条记录, 耗时 {decompress_time:.2f}秒")
    
    # 内存使用对比
    print("\n内存使用对比:")
    print("流式处理: 恒定低内存使用")
    print("全量处理: 需要加载全部数据到内存")
    
    # 性能统计
    total_data_size = sum(len(record) for record in generate_large_data(50000))
    print(f"总数据量: {total_data_size} 字节")
    print(f"压缩比: {total_data_size / compressed_size:.2f}:1")
    print(f"总处理时间: {compress_time + decompress_time:.2f}秒")
    
    # 清理文件
    if os.path.exists('stream_data.gz'):
        os.remove('stream_data.gz')

# 执行示例
streaming_compression()

三、ZIP文件处理

3.1 多文件ZIP归档

import zipfile

def zip_file_operations():
    """ZIP文件操作"""
    # 创建测试文件
    test_files = {
        'document.txt': "这是文本文档内容\n第二行内容\n",
        'data.json': '{"name": "测试", "value": 123, "active": true}',
        'config.ini': "[settings]\nversion=1.0\nenabled=true\n"
    }
    
    for filename, content in test_files.items():
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(content)
        print(f"创建测试文件: {filename}")
    
    # 创建ZIP归档
    with zipfile.ZipFile('example.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
        for filename in test_files.keys():
            zipf.write(filename)
            print(f"添加到ZIP: {filename}")
    
    # 查看ZIP文件信息
    with zipfile.ZipFile('example.zip', 'r') as zipf:
        print(f"\nZIP文件信息:")
        print(f"文件数量: {len(zipf.namelist())}")
        print(f"压缩方法: {zipf.compression}")
        
        for info in zipf.infolist():
            print(f"  {info.filename}: {info.file_size} -> {info.compress_size} 字节 "
                  f"(压缩比 {info.file_size/(info.compress_size or 1):.1f}:1)")
    
    # 提取ZIP文件
    extract_dir = 'extracted'
    os.makedirs(extract_dir, exist_ok=True)
    
    with zipfile.ZipFile('example.zip', 'r') as zipf:
        zipf.extractall(extract_dir)
        print(f"\n文件提取到: {extract_dir}/")
    
    # 验证提取的文件
    for filename in test_files.keys():
        extracted_path = os.path.join(extract_dir, filename)
        if os.path.exists(extracted_path):
            with open(extracted_path, 'r', encoding='utf-8') as f:
                content = f.read()
            print(f"验证 {filename}: {'成功' if content == test_files[filename] else '失败'}")
    
    # 创建带密码的ZIP
    with zipfile.ZipFile('secure.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
        zipf.setpassword(b'secret123')
        for filename in test_files.keys():
            zipf.write(filename)
        print("\n创建加密ZIP: secure.zip")
    
    # 清理测试文件
    for filename in test_files.keys():
        if os.path.exists(filename):
            os.remove(filename)
    
    shutil.rmtree(extract_dir, ignore_errors=True)

# 执行示例
zip_file_operations()

3.2 高级ZIP操作

def advanced_zip_operations():
    """高级ZIP文件操作"""
    # 创建大型测试数据
    def create_large_file(filename, size_mb=1):
        """创建大型测试文件"""
        chunk_size = 1024 * 1024  # 1MB
        with open(filename, 'w', encoding='utf-8') as f:
            for i in range(size_mb):
                chunk = "x" * chunk_size
                f.write(chunk)
                print(f"写入 {i+1} MB")
    
    create_large_file('large_file.txt', 2)  # 2MB文件
    
    # 分卷压缩(模拟)
    def split_zip_archive(source_file, chunk_size_mb=1):
        """分卷压缩文件"""
        chunk_size = chunk_size_mb * 1024 * 1024
        
        part_num = 1
        with open(source_file, 'rb') as src:
            while True:
                chunk_data = src.read(chunk_size)
                if not chunk_data:
                    break
                
                zip_filename = f'archive_part{part_num:03d}.zip'
                with zipfile.ZipFile(zip_filename, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
                    # 使用StringIO模拟文件写入
                    with io.BytesIO(chunk_data) as buffer:
                        zipf.writestr('chunk.dat', buffer.getvalue())
                
                print(f"创建分卷: {zip_filename} ({len(chunk_data)} 字节)")
                part_num += 1
        
        return part_num - 1
    
    # 测试分卷压缩
    print("开始分卷压缩...")
    num_parts = split_zip_archive('large_file.txt', 1)  # 1MB分卷
    print(f"创建了 {num_parts} 个分卷")
    
    # 合并分卷
    def merge_zip_parts(output_file, num_parts):
        """合并分卷文件"""
        with open(output_file, 'wb') as out:
            for i in range(1, num_parts + 1):
                part_file = f'archive_part{i:03d}.zip'
                if os.path.exists(part_file):
                    with zipfile.ZipFile(part_file, 'r') as zipf:
                        # 读取分卷数据
                        with zipf.open('chunk.dat') as chunk_file:
                            chunk_data = chunk_file.read()
                            out.write(chunk_data)
                    print(f"合并分卷: {part_file}")
    
    # 测试分卷合并
    print("开始分卷合并...")
    merge_zip_parts('restored_file.txt', num_parts)
    
    # 验证文件完整性
    original_size = os.path.getsize('large_file.txt')
    restored_size = os.path.getsize('restored_file.txt')
    print(f"原始大小: {original_size} 字节")
    print(f"恢复大小: {restored_size} 字节")
    print(f"完整性检查: {'成功' if original_size == restored_size else '失败'}")
    
    # ZIP文件注释和元数据
    with zipfile.ZipFile('metadata.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
        zipf.writestr('test.txt', '测试内容')
        
        # 添加注释
        zipf.comment = '这是ZIP文件注释'.encode('utf-8')
        
        # 设置文件注释
        for info in zipf.infolist():
            info.comment = '文件注释'.encode('utf-8')
        
        print("添加ZIP注释和元数据")
    
    # 读取注释和元数据
    with zipfile.ZipFile('metadata.zip', 'r') as zipf:
        print(f"ZIP注释: {zipf.comment.decode('utf-8')}")
        for info in zipf.infolist():
            print(f"文件 {info.filename} 注释: {info.comment.decode('utf-8')}")
    
    # 清理文件
    for file in ['large_file.txt', 'restored_file.txt', 'metadata.zip']:
        if os.path.exists(file):
            os.remove(file)
    
    for i in range(1, num_parts + 1):
        part_file = f'archive_part{i:03d}.zip'
        if os.path.exists(part_file):
            os.remove(part_file)

# 执行示例
advanced_zip_operations()

四、压缩数据网络传输

4.1 HTTP压缩传输

def http_compression_transfer():
    """HTTP压缩传输示例"""
    import requests
    from http.server import HTTPServer, BaseHTTPRequestHandler
    import threading
    import gzip
    
    # HTTP压缩处理器
    class CompressionHandler(BaseHTTPRequestHandler):
        def do_GET(self):
            """处理GET请求"""
            if self.path == '/compressed':
                # 生成大量数据
                large_data = "压缩传输测试数据\n" * 1000
                compressed_data = gzip.compress(large_data.encode('utf-8'))
                
                self.send_response(200)
                self.send_header('Content-Type', 'text/plain')
                self.send_header('Content-Encoding', 'gzip')
                self.send_header('Content-Length', str(len(compressed_data)))
                self.end_headers()
                
                self.wfile.write(compressed_data)
                print("发送压缩数据响应")
            
            else:
                self.send_error(404)
        
        def do_POST(self):
            """处理POST请求(接收压缩数据)"""
            if self.path == '/upload':
                content_encoding = self.headers.get('Content-Encoding', '')
                content_length = int(self.headers.get('Content-Length', 0))
                
                if content_encoding == 'gzip':
                    # 接收压缩数据
                    compressed_data = self.rfile.read(content_length)
                    try:
                        decompressed_data = gzip.decompress(compressed_data)
                        received_text = decompressed_data.decode('utf-8')
                        
                        self.send_response(200)
                        self.send_header('Content-Type', 'text/plain')
                        self.end_headers()
                        
                        response = f"接收成功: {len(received_text)} 字符"
                        self.wfile.write(response.encode('utf-8'))
                        print(f"接收并解压数据: {len(received_text)} 字符")
                    
                    except Exception as e:
                        self.send_error(500, f"解压错误: {e}")
                else:
                    self.send_error(400, "需要gzip编码")
    
    def start_server():
        """启动HTTP服务器"""
        server = HTTPServer(('localhost', 8080), CompressionHandler)
        print("HTTP服务器启动在端口 8080")
        server.serve_forever()
    
    # 启动服务器线程
    server_thread = threading.Thread(target=start_server)
    server_thread.daemon = True
    server_thread.start()
    
    # 等待服务器启动
    time.sleep(0.1)
    
    # 客户端测试
    def test_client():
        """测试HTTP客户端"""
        # 测试压缩数据下载
        response = requests.get('http://localhost:8080/compressed')
        print(f"下载响应: {response.status_code}")
        print(f"内容编码: {response.headers.get('Content-Encoding')}")
        print(f"内容长度: {response.headers.get('Content-Length')}")
        
        if response.headers.get('Content-Encoding') == 'gzip':
            # 手动解压
            decompressed = gzip.decompress(response.content)
            text_content = decompressed.decode('utf-8')
            print(f"解压后内容: {len(text_content)} 字符")
        
        # 测试压缩数据上传
        large_data = "上传压缩测试数据\n" * 500
        compressed_data = gzip.compress(large_data.encode('utf-8'))
        
        headers = {
            'Content-Encoding': 'gzip',
            'Content-Type': 'text/plain'
        }
        
        response = requests.post('http://localhost:8080/upload', 
                               data=compressed_data, 
                               headers=headers)
        
        print(f"上传响应: {response.status_code}")
        print(f"上传结果: {response.text}")
    
    # 运行测试
    test_client()

# 执行示例
http_compression_transfer()

4.2 Socket压缩传输

def socket_compression_transfer():
    """Socket压缩传输示例"""
    import socket
    import threading
    import zlib
    
    # 压缩协议处理器
    class CompressionProtocol:
        def __init__(self):
            self.compress_obj = zlib.compressobj()
            self.decompress_obj = zlib.decompressobj()
        
        def compress_data(self, data):
            """压缩数据"""
            compressed = self.compress_obj.compress(data)
            compressed += self.compress_obj.flush(zlib.Z_FULL_FLUSH)
            return compressed
        
        def decompress_data(self, data):
            """解压数据"""
            decompressed = self.decompress_obj.decompress(data)
            return decompressed
        
        def reset(self):
            """重置压缩状态"""
            self.compress_obj = zlib.compressobj()
            self.decompress_obj = zlib.decompressobj()
    
    # 服务器线程
    def server_thread():
        """Socket服务器"""
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.bind(('localhost', 9999))
        server_socket.listen(1)
        
        print("Socket服务器启动,等待连接...")
        conn, addr = server_socket.accept()
        print(f"连接来自: {addr}")
        
        protocol = CompressionProtocol()
        
        try:
            # 接收数据
            received_data = b''
            while True:
                chunk = conn.recv(4096)
                if not chunk:
                    break
                received_data += chunk
            
            # 解压数据
            decompressed = protocol.decompress_data(received_data)
            text_data = decompressed.decode('utf-8')
            
            print(f"接收并解压数据: {len(text_data)} 字符")
            
            # 发送响应
            response = f"接收成功: {len(text_data)} 字符".encode('utf-8')
            compressed_response = protocol.compress_data(response)
            conn.sendall(compressed_response)
            
        finally:
            conn.close()
            server_socket.close()
    
    # 客户端函数
    def client_example():
        """Socket客户端示例"""
        # 等待服务器启动
        time.sleep(0.1)
        
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client_socket.connect(('localhost', 9999))
        
        protocol = CompressionProtocol()
        
        # 准备发送数据
        large_data = "Socket压缩传输测试数据\n" * 1000
        compressed_data = protocol.compress_data(large_data.encode('utf-8'))
        
        print(f"原始数据: {len(large_data)} 字符")
        print(f"压缩数据: {len(compressed_data)} 字节")
        print(f"压缩比: {len(large_data.encode('utf-8')) / len(compressed_data):.2f}:1")
        
        # 发送数据
        client_socket.sendall(compressed_data)
        client_socket.shutdown(socket.SHUT_WR)  # 发送完成
        
        # 接收响应
        response_data = b''
        while True:
            chunk = client_socket.recv(4096)
            if not chunk:
                break
            response_data += chunk
        
        # 解压响应
        decompressed_response = protocol.decompress_data(response_data)
        response_text = decompressed_response.decode('utf-8')
        
        print(f"服务器响应: {response_text}")
        
        client_socket.close()
    
    # 启动服务器线程
    server = threading.Thread(target=server_thread)
    server.start()
    
    # 运行客户端
    client_example()
    server.join()

# 执行示例
socket_compression_transfer()

五、高级应用场景

5.1 日志压缩归档系统

def log_compression_system():
    """日志压缩归档系统"""
    import logging
    from logging.handlers import RotatingFileHandler
    import datetime
    
    class CompressedRotatingFileHandler(RotatingFileHandler):
        """支持压缩的循环文件处理器"""
        def __init__(self, filename, **kwargs):
            # 确保目录存在
            os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True)
            super().__init__(filename, **kwargs)
        
        def doRollover(self):
            """重写滚动方法,添加压缩功能"""
            if self.stream:
                self.stream.close()
                self.stream = None
            
            # 获取需要滚动的文件
            dfn = self.rotation_filename(self.baseFilename)
            
            if os.path.exists(dfn):
                os.remove(dfn)
            
            self.rotate(self.baseFilename, dfn)
            
            # 压缩旧日志文件
            if self.backupCount > 0:
                for i in range(self.backupCount - 1, 0, -1):
                    sfn = self.rotation_filename(self.baseFilename + f".{i}.gz")
                    dfn = self.rotation_filename(self.baseFilename + f".{i+1}.gz")
                    
                    if os.path.exists(sfn):
                        if os.path.exists(dfn):
                            os.remove(dfn)
                        os.rename(sfn, dfn)
                
                # 压缩当前滚动文件
                sfn = self.rotation_filename(self.baseFilename + ".1")
                dfn = self.rotation_filename(self.baseFilename + ".1.gz")
                
                if os.path.exists(sfn):
                    # 使用GZIP压缩
                    with open(sfn, 'rb') as f_in:
                        with gzip.open(dfn, 'wb') as f_out:
                            shutil.copyfileobj(f_in, f_out)
                    os.remove(sfn)
            
            if not self.delay:
                self.stream = self._open()
    
    def setup_logging():
        """设置日志系统"""
        log_dir = 'logs'
        os.makedirs(log_dir, exist_ok=True)
        
        # 主日志文件
        main_log = os.path.join(log_dir, 'application.log')
        
        # 配置日志处理器
        handler = CompressedRotatingFileHandler(
            main_log,
            maxBytes=1024 * 1024,  # 1MB
            backupCount=5,
            encoding='utf-8'
        )
        
        # 配置日志格式
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        
        # 配置根日志器
        root_logger = logging.getLogger()
        root_logger.setLevel(logging.INFO)
        root_logger.addHandler(handler)
        
        return root_logger
    
    def generate_log_data():
        """生成测试日志数据"""
        logger = setup_logging()
        
        # 生成大量日志
        for i in range(1000):
            logger.info(f"测试日志消息 {i}: 这是详细的日志内容用于测试压缩效果")
            
            if i % 100 == 0:
                logger.error(f"错误日志 {i}: 模拟错误情况")
        
        print("日志生成完成")
        
        # 查看生成的日志文件
        log_dir = 'logs'
        if os.path.exists(log_dir):
            files = os.listdir(log_dir)
            print(f"日志文件: {files}")
            
            # 检查压缩文件
            compressed_files = [f for f in files if f.endswith('.gz')]
            if compressed_files:
                print(f"压缩日志文件: {compressed_files}")
                
                # 查看压缩文件信息
                for comp_file in compressed_files:
                    filepath = os.path.join(log_dir, comp_file)
                    size = os.path.getsize(filepath)
                    print(f"  {comp_file}: {size} 字节")
    
    # 运行日志系统测试
    generate_log_data()
    
    # 日志分析功能
    def analyze_compressed_logs():
        """分析压缩日志"""
        log_dir = 'logs'
        if not os.path.exists(log_dir):
            print("日志目录不存在")
            return
        
        compressed_files = [f for f in os.listdir(log_dir) if f.endswith('.gz')]
        
        for comp_file in compressed_files:
            filepath = os.path.join(log_dir, comp_file)
            print(f"\n分析压缩日志: {comp_file}")
            
            # 读取压缩日志
            with gzip.open(filepath, 'rt', encoding='utf-8') as f:
                line_count = 0
                error_count = 0
                
                for line in f:
                    line_count += 1
                    if 'ERROR' in line:
                        error_count += 1
                
                print(f"  总行数: {line_count}")
                print(f"  错误数: {error_count}")
                print(f"  错误比例: {(error_count/line_count*100 if line_count > 0 else 0):.1f}%")
    
    # 分析日志
    analyze_compressed_logs()
    
    # 清理测试文件
    if os.path.exists('logs'):
        shutil.rmtree('logs')

# 执行示例
log_compression_system()

5.2 数据库备份压缩

def database_backup_compression():
    """数据库备份压缩系统"""
    import sqlite3
    import json
    
    # 创建示例数据库
    def create_sample_database():
        """创建示例数据库"""
        if os.path.exists('sample.db'):
            os.remove('sample.db')
        
        conn = sqlite3.connect('sample.db')
        cursor = conn.cursor()
        
        # 创建表
        cursor.execute('''
            CREATE TABLE users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT UNIQUE,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE orders (
                id INTEGER PRIMARY KEY,
                user_id INTEGER,
                amount REAL,
                status TEXT,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (user_id) REFERENCES users (id)
            )
        ''')
        
        # 插入示例数据
        users = [
            ('张三', 'zhangsan@example.com'),
            ('李四', 'lisi@example.com'),
            ('王五', 'wangwu@example.com')
        ]
        
        cursor.executemany(
            'INSERT INTO users (name, email) VALUES (?, ?)',
            users
        )
        
        orders = [
            (1, 100.50, 'completed'),
            (1, 200.75, 'pending'),
            (2, 50.25, 'completed'),
            (3, 300.00, 'shipped')
        ]
        
        cursor.executemany(
            'INSERT INTO orders (user_id, amount, status) VALUES (?, ?, ?)',
            orders
        )
        
        conn.commit()
        conn.close()
        
        print("示例数据库创建完成")
    
    create_sample_database()
    
    # 数据库备份函数
    def backup_database(db_path, backup_path, compression_format='gzip'):
        """备份数据库到压缩文件"""
        # 读取数据库内容
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # 获取所有表
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
        tables = [row[0] for row in cursor.fetchall()]
        
        backup_data = {}
        
        for table in tables:
            # 获取表结构
            cursor.execute(f"SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table,))
            schema = cursor.fetchone()[0]
            
            # 获取表数据
            cursor.execute(f"SELECT * FROM {table}")
            rows = cursor.fetchall()
            
            # 获取列名
            column_names = [description[0] for description in cursor.description]
            
            backup_data[table] = {
                'schema': schema,
                'columns': column_names,
                'data': rows
            }
        
        conn.close()
        
        # 序列化备份数据
        serialized_data = json.dumps(backup_data, ensure_ascii=False, default=str)
        
        # 压缩备份
        if compression_format == 'gzip':
            with gzip.open(backup_path, 'wt', encoding='utf-8') as f:
                f.write(serialized_data)
        elif compression_format == 'bz2':
            import bz2
            with bz2.open(backup_path, 'wt', encoding='utf-8') as f:
                f.write(serialized_data)
        else:
            raise ValueError(f"不支持的压缩格式: {compression_format}")
        
        print(f"数据库备份完成: {backup_path}")
        
        # 显示备份信息
        original_size = os.path.getsize(db_path)
        compressed_size = os.path.getsize(backup_path)
        print(f"原始大小: {original_size} 字节")
        print(f"压缩大小: {compressed_size} 字节")
        print(f"压缩比: {original_size/compressed_size:.2f}:1")
    
    # 执行备份
    backup_database('sample.db', 'backup.json.gz')
    
    # 数据库恢复函数
    def restore_database(backup_path, db_path, compression_format='gzip'):
        """从压缩备份恢复数据库"""
        if os.path.exists(db_path):
            os.remove(db_path)
        
        # 解压并读取备份
        if compression_format == 'gzip':
            with gzip.open(backup_path, 'rt', encoding='utf-8') as f:
                backup_data = json.load(f)
        elif compression_format == 'bz2':
            import bz2
            with bz2.open(backup_path, 'rt', encoding='utf-8') as f:
                backup_data = json.load(f)
        else:
            raise ValueError(f"不支持的压缩格式: {compression_format}")
        
        # 恢复数据库
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # 按顺序恢复表(处理外键约束)
        table_order = ['users', 'orders']  # 根据外键依赖排序
        
        for table in table_order:
            if table in backup_data:
                # 创建表
                cursor.execute(backup_data[table]['schema'])
                
                # 插入数据
                if backup_data[table]['data']:
                    columns = backup_data[table]['columns']
                    placeholders = ', '.join(['?'] * len(columns))
                    
                    insert_sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
                    cursor.executemany(insert_sql, backup_data[table]['data'])
        
        conn.commit()
        conn.close()
        
        print(f"数据库恢复完成: {db_path}")
        
        # 验证恢复结果
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        cursor.execute("SELECT COUNT(*) FROM users")
        user_count = cursor.fetchone()[0]
        
        cursor.execute("SELECT COUNT(*) FROM orders")
        order_count = cursor.fetchone()[0]
        
        conn.close()
        
        print(f"恢复用户数: {user_count}")
        print(f"恢复订单数: {order_count}")
    
    # 执行恢复
    restore_database('backup.json.gz', 'restored.db')
    
    # 增量备份示例
    def incremental_backup(db_path, backup_dir):
        """增量备份示例"""
        os.makedirs(backup_dir, exist_ok=True)
        
        # 获取当前时间戳
        timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = os.path.join(backup_dir, f'backup_{timestamp}.json.gz')
        
        # 执行备份
        backup_database(db_path, backup_file)
        
        # 清理旧备份(保留最近5个)
        backup_files = sorted([f for f in os.listdir(backup_dir) if f.startswith('backup_')])
        if len(backup_files) > 5:
            for old_file in backup_files[:-5]:
                os.remove(os.path.join(backup_dir, old_file))
                print(f"删除旧备份: {old_file}")
    
    # 创建增量备份
    incremental_backup('sample.db', 'backups')
    
    # 列出备份文件
    if os.path.exists('backups'):
        backup_files = os.listdir('backups')
        print(f"\n备份文件列表: {backup_files}")
        
        # 显示备份信息
        for backup_file in backup_files:
            filepath = os.path.join('backups', backup_file)
            size = os.path.getsize(filepath)
            print(f"  {backup_file}: {size} 字节")
    
    # 清理测试文件
    for file in ['sample.db', 'restored.db', 'backup.json.gz']:
        if os.path.exists(file):
            os.remove(file)
    
    if os.path.exists('backups'):
        shutil.rmtree('backups')

# 执行示例
database_backup_compression()

六、性能优化与错误处理

6.1 压缩性能优化

def compression_performance_optimization():
    """压缩性能优化策略"""
    import pandas as pd
    import numpy as np
    
    # 生成测试数据
    def generate_test_data():
        """生成多种类型的测试数据"""
        # 文本数据
        text_data = "重复文本内容 " * 10000
        
        # 数值数据
        numeric_data = np.random.rand(10000).tolist()
        
        # 混合数据
        mixed_data = []
        for i in range(5000):
            mixed_data.append({
                'id': i,
                'name': f'Item_{i}',
                'value': np.random.rand(),
                'timestamp': datetime.datetime.now().isoformat()
            })
        
        return {
            'text': text_data,
            'numeric': numeric_data,
            'mixed': mixed_data
        }
    
    test_datasets = generate_test_data()
    
    # 测试不同压缩格式的性能
    def test_compression_performance(data, data_name):
        """测试压缩性能"""
        results = []
        
        # 序列化数据
        if isinstance(data, (list, dict)):
            serialized_data = json.dumps(data, ensure_ascii=False)
        else:
            serialized_data = str(data)
        
        binary_data = serialized_data.encode('utf-8')
        print(f"{data_name} 数据大小: {len(binary_data)} 字节")
        
        # 测试不同压缩格式
        compressors = [
            ('gzip', gzip.compress),
            ('bz2', bz2.compress),
            ('lzma', lzma.compress),
            ('zlib', zlib.compress)
        ]
        
        for name, compress_func in compressors:
            # 测试压缩
            start_time = time.time()
            compressed_data = compress_func(binary_data)
            compress_time = time.time() - start_time
            
            # 测试解压
            start_time = time.time()
            if name == 'gzip':
                decompressed = gzip.decompress(compressed_data)
            elif name == 'bz2':
                decompressed = bz2.decompress(compressed_data)
            elif name == 'lzma':
                decompressed = lzma.decompress(compressed_data)
            elif name == 'zlib':
                decompressed = zlib.decompress(compressed_data)
            
            decompress_time = time.time() - start_time
            
            # 验证数据完整性
            original_restored = decompressed.decode('utf-8')
            if isinstance(data, (list, dict)):
                data_restored = json.loads(original_restored)
                is_valid = data == data_restored
            else:
                is_valid = data == original_restored
            
            results.append({
                'format': name,
                'original_size': len(binary_data),
                'compressed_size': len(compressed_data),
                'compression_ratio': len(binary_data) / len(compressed_data),
                'compress_time': compress_time,
                'decompress_time': decompress_time,
                'total_time': compress_time + decompress_time,
                'is_valid': is_valid
            })
        
        return results
    
    # 运行性能测试
    all_results = {}
    
    for data_name, data in test_datasets.items():
        print(f"\n测试 {data_name} 数据:")
        results = test_compression_performance(data, data_name)
        all_results[data_name] = results
        
        for result in results:
            print(f"  {result['format']}: {result['compressed_size']} 字节, "
                  f"压缩比 {result['compression_ratio']:.2f}:1, "
                  f"总耗时 {result['total_time']:.3f}秒")
    
    # 生成性能报告
    def generate_performance_report(results):
        """生成性能报告"""
        report_data = []
        
        for data_type, compression_results in results.items():
            for result in compression_results:
                report_data.append({
                    'data_type': data_type,
                    'format': result['format'],
                    'compression_ratio': result['compression_ratio'],
                    'total_time': result['total_time'],
                    'compress_time': result['compress_time'],
                    'decompress_time': result['decompress_time']
                })
        
        df = pd.DataFrame(report_data)
        
        # 总结报告
        print("\n性能总结:")
        summary = df.groupby(['data_type', 'format']).agg({
            'compression_ratio': 'mean',
            'total_time': 'mean'
        }).round(2)
        
        print(summary)
        
        # 最佳选择推荐
        best_choices = {}
        for data_type in results.keys():
            type_results = [r for r in results[data_type]]
            best_ratio = max(type_results, key=lambda x: x['compression_ratio'])
            best_speed = min(type_results, key=lambda x: x['total_time'])
            
            best_choices[data_type] = {
                'best_compression': best_ratio['format'],
                'best_speed': best_speed['format']
            }
        
        print("\n推荐选择:")
        for data_type, choices in best_choices.items():
            print(f"  {data_type}:")
            print(f"    最佳压缩: {choices['best_compression']}")
            print(f"    最快速度: {choices['best_speed']}")
    
    generate_performance_report(all_results)
    
    # 内存使用优化
    def memory_efficient_compression():
        """内存高效的压缩处理"""
        large_data = "大型数据内容 " * 1000000
        print(f"大型数据大小: {len(large_data)} 字符")
        
        # 传统方法(内存密集型)
        start_time = time.time()
        compressed = gzip.compress(large_data.encode('utf-8'))
        traditional_time = time.time() - start_time
        traditional_memory = len(compressed)
        
        # 流式方法(内存友好)
        start_time = time.time()
        with io.BytesIO() as buffer:
            with gzip.GzipFile(fileobj=buffer, mode='wb') as gz:
                # 分块处理
                chunk_size = 1024 * 1024  # 1MB
                for i in range(0, len(large_data), chunk_size):
                    chunk = large_data[i:i + chunk_size]
                    gz.write(chunk.encode('utf-8'))
            
            stream_compressed = buffer.getvalue()
        
        stream_time = time.time() - start_time
        stream_memory = len(stream_compressed)
        
        print(f"传统方法: {traditional_time:.3f}秒, 内存使用: {traditional_memory} 字节")
        print(f"流式方法: {stream_time:.3f}秒, 内存使用: {stream_memory} 字节")
        print(f"压缩比: {len(large_data.encode('utf-8')) / traditional_memory:.2f}:1")
        print(f"性能差异: {traditional_time/stream_time:.2f}倍")
    
    memory_efficient_compression()

# 执行示例
compression_performance_optimization()

6.2 错误处理与恢复

def compression_error_handling():
    """压缩错误处理与恢复"""
    class SafeCompression:
        """安全的压缩处理类"""
        def __init__(self):
            self.error_log = []
        
        def safe_compress(self, data, compression_format='gzip'):
            """安全压缩数据"""
            try:
                if compression_format == 'gzip':
                    compressed = gzip.compress(data)
                elif compression_format == 'bz2':
                    compressed = bz2.compress(data)
                elif compression_format == 'lzma':
                    compressed = lzma.compress(data)
                else:
                    raise ValueError(f"不支持的压缩格式: {compression_format}")
                
                return compressed
                
            except Exception as e:
                self.error_log.append(f"压缩错误: {e}")
                # 回退到不压缩
                return data
        
        def safe_decompress(self, data, compression_format='auto'):
            """安全解压数据"""
            try:
                # 自动检测压缩格式
                if compression_format == 'auto':
                    if data.startswith(b'\x1f\x8b'):  # GZIP魔数
                        return gzip.decompress(data)
                    elif data.startswith(b'BZh'):  # BZIP2魔数
                        return bz2.decompress(data)
                    elif data.startswith(b'\xfd7zXZ'):  # XZ魔数
                        return lzma.decompress(data)
                    else:
                        # 假设未压缩
                        return data
                else:
                    if compression_format == 'gzip':
                        return gzip.decompress(data)
                    elif compression_format == 'bz2':
                        return bz2.decompress(data)
                    elif compression_format == 'lzma':
                        return lzma.decompress(data)
                    else:
                        raise ValueError(f"不支持的压缩格式: {compression_format}")
                        
            except Exception as e:
                self.error_log.append(f"解压错误: {e}")
                # 尝试其他格式或返回原始数据
                try:
                    return gzip.decompress(data)
                except:
                    try:
                        return bz2.decompress(data)
                    except:
                        try:
                            return lzma.decompress(data)
                        except:
                            return data  # 最终回退
        
        def get_errors(self):
            """获取错误日志"""
            return self.error_log
        
        def clear_errors(self):
            """清除错误日志"""
            self.error_log = []
    
    # 使用安全压缩类
    compressor = SafeCompression()
    
    # 测试正常压缩
    test_data = "正常测试数据".encode('utf-8')
    compressed = compressor.safe_compress(test_data, 'gzip')
    decompressed = compressor.safe_decompress(compressed, 'auto')
    
    print(f"正常测试: {test_data == decompressed}")
    print(f"错误日志: {compressor.get_errors()}")
    compressor.clear_errors()
    
    # 测试错误情况
    invalid_data = b"无效压缩数据"
    try:
        # 故意触发错误
        decompressed = compressor.safe_decompress(invalid_data, 'gzip')
        print(f"错误处理测试: 成功恢复, 结果长度: {len(decompressed)}")
    except Exception as e:
        print(f"错误处理测试: 捕获异常 {e}")
    
    print(f"错误日志: {compressor.get_errors()}")
    
    # 文件压缩错误处理
    def safe_file_compression(input_file, output_file, compression_format='gzip'):
        """安全的文件压缩"""
        try:
            # 检查输入文件
            if not os.path.exists(input_file):
                raise FileNotFoundError(f"输入文件不存在: {input_file}")
            
            # 检查输出目录
            output_dir = os.path.dirname(output_file)
            if output_dir and not os.path.exists(output_dir):
                os.makedirs(output_dir, exist_ok=True)
            
            # 读取输入文件
            with open(input_file, 'rb') as f_in:
                original_data = f_in.read()
            
            # 压缩数据
            if compression_format == 'gzip':
                compressed_data = gzip.compress(original_data)
            elif compression_format == 'bz2':
                compressed_data = bz2.compress(original_data)
            elif compression_format == 'lzma':
                compressed_data = lzma.compress(original_data)
            else:
                raise ValueError(f"不支持的压缩格式: {compression_format}")
            
            # 写入输出文件
            with open(output_file, 'wb') as f_out:
                f_out.write(compressed_data)
            
            # 验证压缩
            with open(output_file, 'rb') as f_check:
                check_data = f_check.read()
            
            if compression_format == 'gzip':
                decompressed_check = gzip.decompress(check_data)
            # ... 其他格式类似
            
            if decompressed_check != original_data:
                raise ValueError("压缩验证失败: 数据不一致")
            
            return True
            
        except Exception as e:
            print(f"文件压缩错误: {e}")
            # 错误恢复: 尝试其他压缩格式或创建备份
            try:
                backup_file = output_file + '.backup'
                shutil.copy2(input_file, backup_file)
                print(f"创建备份文件: {backup_file}")
                return False
            except Exception as backup_error:
                print(f"备份创建也失败: {backup_error}")
                return False
    
    # 测试文件压缩
    test_content = "文件压缩测试内容".encode('utf-8')
    with open('test_input.txt', 'wb') as f:
        f.write(test_content)
    
    success = safe_file_compression('test_input.txt', 'test_output.gz')
    print(f"文件压缩结果: {'成功' if success else '失败'}")
    
    # 清理测试文件
    for file in ['test_input.txt', 'test_output.gz']:
        if os.path.exists(file):
            os.remove(file)

# 执行示例
compression_error_handling()

七、总结:压缩文件处理最佳实践

7.1 技术选型指南

场景推荐方案优势注意事项
​通用压缩​GZIP平衡性好,支持广泛压缩比中等
​高压缩比​BZIP2/LZMA极高的压缩比较慢的压缩速度
​网络传输​ZLIB流式处理友好需要自定义包装
​文件归档​ZIP多文件支持,通用性好功能相对复杂
​实时压缩​低级别GZIP快速压缩解压压缩比较低

7.2 核心原则总结

​1.选择合适的压缩格式​​:

2.​​性能优化策略​​:

3.​​错误处理与恢复​​:

4.​​内存管理​​:

5.​​并发安全​​:

6.​​测试与验证​​:

7.3 实战建议模板

def professional_compression_template():
    """
    专业压缩处理模板
    包含错误处理、性能优化、资源管理等最佳实践
    """
    class ProfessionalCompressor:
        def __init__(self, default_format='gzip', default_level=6):
            self.default_format = default_format
            self.default_level = default_level
            self.error_log = []
            self.performance_stats = {
                'compress_operations': 0,
                'decompress_operations': 0,
                'total_bytes_processed': 0
            }
        
        def compress(self, data, format=None, level=None):
            """安全压缩数据"""
            format = format or self.default_format
            level = level or self.default_level
            
            try:
                start_time = time.time()
                
                if format == 'gzip':
                    compressed = gzip.compress(data, compresslevel=level)
                elif format == 'bz2':
                    compressed = bz2.compress(data, compresslevel=level)
                elif format == 'lzma':
                    compressed = lzma.compress(data, preset=level)
                else:
                    raise ValueError(f"不支持的压缩格式: {format}")
                
                process_time = time.time() - start_time
                
                # 更新统计
                self.performance_stats['compress_operations'] += 1
                self.performance_stats['total_bytes_processed'] += len(data)
                
                return compressed
                
            except Exception as e:
                self.error_log.append({
                    'time': datetime.now().isoformat(),
                    'operation': 'compress',
                    'format': format,
                    'error': str(e)
                })
                raise
        
        def decompress(self, data, format='auto'):
            """安全解压数据"""
            try:
                start_time = time.time()
                
                if format == 'auto':
                    # 自动检测格式
                    if data.startswith(b'\x1f\x8b'):
                        result = gzip.decompress(data)
                    elif data.startswith(b'BZh'):
                        result = bz2.decompress(data)
                    elif data.startswith(b'\xfd7zXZ'):
                        result = lzma.decompress(data)
                    else:
                        result = data  # 未压缩数据
                else:
                    if format == 'gzip':
                        result = gzip.decompress(data)
                    elif format == 'bz2':
                        result = bz2.decompress(data)
                    elif format == 'lzma':
                        result = lzma.decompress(data)
                    else:
                        raise ValueError(f"不支持的压缩格式: {format}")
                
                process_time = time.time() - start_time
                
                # 更新统计
                self.performance_stats['decompress_operations'] += 1
                self.performance_stats['total_bytes_processed'] += len(data)
                
                return result
                
            except Exception as e:
                self.error_log.append({
                    'time': datetime.now().isoformat(),
                    'operation': 'decompress',
                    'format': format,
                    'error': str(e)
                })
                raise
        
        def get_stats(self):
            """获取统计信息"""
            return self.performance_stats.copy()
        
        def get_errors(self):
            """获取错误信息"""
            return self.error_log.copy()
        
        def clear_stats(self):
            """清除统计信息"""
            self.performance_stats = {
                'compress_operations': 0,
                'decompress_operations': 0,
                'total_bytes_processed': 0
            }
        
        def clear_errors(self):
            """清除错误信息"""
            self.error_log = []
    
    # 使用示例
    compressor = ProfessionalCompressor(default_format='gzip', default_level=6)
    
    try:
        # 测试数据
        test_data = "专业压缩测试数据".encode('utf-8')
        
        # 压缩
        compressed = compressor.compress(test_data)
        print(f"压缩后大小: {len(compressed)} 字节")
        
        # 解压
        decompressed = compressor.decompress(compressed)
        print(f"解压成功: {test_data == decompressed}")
        
        # 查看统计
        stats = compressor.get_stats()
        print(f"操作统计: {stats}")
        
    except Exception as e:
        print(f"压缩操作失败: {e}")
        errors = compressor.get_errors()
        print(f"错误信息: {errors}")

# 执行示例
professional_compression_template()

通过本文的全面探讨,我们深入了解了Python压缩文件处理的完整技术体系。从基础的GZIP操作到高级的流式处理,从简单的文件压缩到复杂的网络传输,我们覆盖了压缩文件处理领域的核心知识点。

压缩文件处理是Python开发中的基础且重要的技能,掌握这些技术将大大提高您的程序性能和处理能力。无论是开发数据存储系统、实现网络服务,还是构建高性能应用,这些技术都能为您提供强大的支持。

记住,优秀的压缩文件处理实现不仅关注功能正确性,更注重性能、资源效率和健壮性。始终根据具体需求选择最适合的技术方案,在功能与复杂度之间找到最佳平衡点。

以上就是Python中读写压缩数据文件的方法完全指南的详细内容,更多关于Python读写压缩文件的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文