python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python处理二进制数据

一文详解Python如何处理二进制数据

作者:Python×CATIA工业智造

在现代计算系统中,二进制数据处理是至关重要的核心技术,本文将深入解析Python二进制数据读写技术体系,文中的示例代码讲解详细,需要的小伙伴可以了解下

引言:二进制数据处理的核心价值

在现代计算系统中,二进制数据处理是至关重要的核心技术。根据2024年数据工程报告:

Python提供了强大的二进制数据处理能力,但许多开发者未能充分利用其全部潜力。本文将深入解析Python二进制数据读写技术体系,结合Python Cookbook精髓,并拓展文件格式解析、网络协议、内存映射、高性能计算等工程级应用场景。

一、基础二进制读写操作

1.1 基本文件二进制操作

def basic_binary_operations():
    """基础二进制文件操作"""
    # 写入二进制数据
    with open('binary_data.bin', 'wb') as f:
        # 写入字节数据
        f.write(b'\x00\x01\x02\x03\x04\x05')  # 十六进制字节
        f.write(bytes([10, 20, 30, 40, 50]))  # 十进制字节数组
        f.write(bytearray([100, 200, 255]))   # 字节数组
        
        # 写入文本数据的二进制形式
        text_data = "Hello, 世界!"
        f.write(text_data.encode('utf-8'))
    
    # 读取二进制数据
    with open('binary_data.bin', 'rb') as f:
        # 读取全部数据
        all_data = f.read()
        print(f"全部数据: {all_data}")
        print(f"数据长度: {len(all_data)} 字节")
        
        # 重新定位到文件开头
        f.seek(0)
        
        # 读取指定字节数
        first_6_bytes = f.read(6)
        print(f"前6字节: {first_6_bytes}")
        
        # 继续读取
        next_bytes = f.read(5)
        print(f"接下来5字节: {next_bytes}")
        
        # 读取剩余数据
        remaining = f.read()
        print(f"剩余数据: {remaining}")
        
        # 尝试解码文本数据
        try:
            text_content = remaining.decode('utf-8')
            print(f"解码文本: {text_content}")
        except UnicodeDecodeError as e:
            print(f"解码错误: {e}")

# 执行示例
basic_binary_operations()

1.2 字节操作与转换

def byte_manipulation():
    """字节数据操作与转换"""
    # 创建字节数据的多种方式
    data1 = bytes([0, 1, 2, 3, 4, 5])
    data2 = bytearray([10, 20, 30, 40, 50])
    data3 = b'\x00\x01\x02\x03\x04\x05'
    data4 = "Hello".encode('ascii')
    
    print("字节数据示例:")
    print(f"bytes: {data1}")
    print(f"bytearray: {data2}")
    print(f"字节字面量: {data3}")
    print(f"编码文本: {data4}")
    
    # 字节数据操作
    combined = data1 + data2 + data3 + data4
    print(f"合并数据: {combined}")
    print(f"合并长度: {len(combined)} 字节")
    
    # 切片操作
    slice1 = combined[5:10]
    slice2 = combined[-5:]
    print(f"切片[5:10]: {slice1}")
    print(f"切片[-5:]: {slice2}")
    
    # 修改字节数据 (bytearray可变)
    mutable_data = bytearray(combined)
    mutable_data[0] = 255  # 修改第一个字节
    mutable_data[5:10] = b'\xFF\xFE\xFD'  # 替换范围
    print(f"修改后数据: {mutable_data}")
    
    # 搜索和替换
    if b'Hello' in combined:
        position = combined.index(b'Hello')
        print(f"找到 'Hello' 在位置: {position}")
    
    # 计数和统计
    count_zeros = combined.count(0)
    print(f"字节0出现次数: {count_zeros}")
    
    # 转换为不同表示形式
    print(f"十六进制: {combined.hex()}")
    print(f"整数列表: {list(combined)}")

byte_manipulation()

二、结构化二进制数据处理

2.1 使用struct模块处理二进制数据

import struct

def struct_module_usage():
    """使用struct模块处理结构化二进制数据"""
    # 打包数据
    packed_data = struct.pack('>I f 4s', 123456, 3.14159, b'TEST')
    print(f"打包数据: {packed_data}")
    print(f"打包长度: {len(packed_data)} 字节")
    
    # 解包数据
    unpacked_data = struct.unpack('>I f 4s', packed_data)
    print(f"解包数据: {unpacked_data}")
    
    # 计算大小
    calc_size = struct.calcsize('>I f 4s')
    print(f"格式大小: {calc_size} 字节")
    
    # 复杂数据结构
    complex_format = '>I I 10s d'  # 大端序,2个整数,10字节字符串,双精度浮点数
    sample_data = (1001, 2002, b'HelloWorld', 2.71828)
    
    packed_complex = struct.pack(complex_format, *sample_data)
    print(f"复杂打包: {packed_complex}")
    
    # 写入文件
    with open('structured.bin', 'wb') as f:
        f.write(packed_complex)
    
    # 从文件读取并解包
    with open('structured.bin', 'rb') as f:
        file_data = f.read()
        unpacked_complex = struct.unpack(complex_format, file_data)
        print(f"文件解包: {unpacked_complex}")
        
        # 处理字符串数据
        text_data = unpacked_complex[2].decode('ascii').rstrip('\x00')
        print(f"解码文本: {text_data}")
    
    # 处理多个结构
    multiple_records = []
    for i in range(5):
        record = struct.pack('>H d', i, i * 1.5)
        multiple_records.append(record)
    
    # 写入多个记录
    with open('multiple_records.bin', 'wb') as f:
        for record in multiple_records:
            f.write(record)
    
    # 读取多个记录
    record_format = '>H d'
    record_size = struct.calcsize(record_format)
    
    with open('multiple_records.bin', 'rb') as f:
        while True:
            record_data = f.read(record_size)
            if not record_data:
                break
            unpacked_record = struct.unpack(record_format, record_data)
            print(f"记录: {unpacked_record}")

struct_module_usage()

2.2 二进制数据协议实现

def binary_protocol_implementation():
    """二进制协议实现"""
    # 定义协议格式
    class BinaryProtocol:
        """简单的二进制协议"""
        HEADER_FORMAT = '>I I'  # 大端序,消息类型 + 数据长度
        HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
        
        def __init__(self):
            self.buffer = bytearray()
        
        def encode_message(self, msg_type, data):
            """编码消息"""
            # 编码数据
            if isinstance(data, str):
                encoded_data = data.encode('utf-8')
            else:
                encoded_data = data
            
            # 打包头部
            header = struct.pack(self.HEADER_FORMAT, msg_type, len(encoded_data))
            
            # 返回完整消息
            return header + encoded_data
        
        def decode_message(self, data):
            """解码消息"""
            messages = []
            self.buffer.extend(data)
            
            while len(self.buffer) >= self.HEADER_SIZE:
                # 解析头部
                header_data = self.buffer[:self.HEADER_SIZE]
                msg_type, data_length = struct.unpack(self.HEADER_FORMAT, header_data)
                
                # 检查是否有足够的数据
                total_needed = self.HEADER_SIZE + data_length
                if len(self.buffer) < total_needed:
                    break  # 等待更多数据
                
                # 提取数据部分
                data_part = self.buffer[self.HEADER_SIZE:total_needed]
                
                # 尝试解码文本
                try:
                    decoded_data = data_part.decode('utf-8')
                except UnicodeDecodeError:
                    decoded_data = data_part  # 保持为字节
                
                messages.append((msg_type, decoded_data))
                
                # 从缓冲区移除已处理的数据
                self.buffer = self.buffer[total_needed:]
            
            return messages
    
    # 使用协议
    protocol = BinaryProtocol()
    
    # 编码多个消息
    messages = [
        (1, "Hello, World!"),
        (2, "Binary Protocol Test"),
        (3, "结束消息"),
        (4, b'\x00\x01\x02\x03\x04')  # 二进制数据
    ]
    
    encoded_packets = []
    for msg_type, data in messages:
        packet = protocol.encode_message(msg_type, data)
        encoded_packets.append(packet)
        print(f"编码包 {msg_type}: {packet[:20]}... (长度: {len(packet)})")
    
    # 模拟网络传输(连接所有包)
    transmitted_data = b''.join(encoded_packets)
    print(f"\n传输数据总长度: {len(transmitted_data)} 字节")
    
    # 解码消息
    print("\n解码消息:")
    decoded_messages = protocol.decode_message(transmitted_data)
    for msg_type, data in decoded_messages:
        print(f"类型: {msg_type}, 数据: {data}")

binary_protocol_implementation()

三、高级二进制处理技术

3.1 内存映射文件处理

import mmap

def memory_mapped_operations():
    """内存映射文件处理大型二进制数据"""
    # 创建大型二进制文件
    with open('large_binary.bin', 'wb') as f:
        # 生成测试数据 (1MB)
        data = bytes([i % 256 for i in range(1024 * 1024)])
        f.write(data)
    
    # 使用内存映射读取
    with open('large_binary.bin', 'r+b') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            print(f"文件大小: {len(mm)} 字节")
            
            # 随机访问
            print(f"位置1000的字节: {mm[1000]}")
            print(f"位置5000-5010的切片: {mm[5000:5010]}")
            
            # 搜索模式
            pattern = b'\x00\x01\x02\x03'
            position = mm.find(pattern)
            if position != -1:
                print(f"找到模式在位置: {position}")
            
            # 批量处理
            chunk_size = 4096
            for offset in range(0, len(mm), chunk_size):
                chunk = mm[offset:offset + chunk_size]
                # 处理块数据
                if b'\xff' in chunk:
                    print(f"在块 {offset}-{offset+chunk_size} 中找到 0xFF")
    
    # 可写内存映射
    with open('mutable_binary.bin', 'w+b') as f:
        # 初始化文件
        f.write(b'\x00' * 1000)  # 1000字节空文件
        
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE) as mm:
            # 修改数据
            mm[0:5] = b'HELLO'  # 修改前5字节
            mm.seek(100)
            mm.write(b'WORLD')   # 在位置100写入
            
            # 同步到磁盘
            mm.flush()
            
            # 验证修改
            mm.seek(0)
            print(f"修改后开始: {mm.read(10)}")
            mm.seek(100)
            print(f"位置100: {mm.read(5)}")

memory_mapped_operations()

3.2 二进制数据流处理

def binary_stream_processing():
    """二进制数据流处理"""
    import io
    
    class BinaryStreamProcessor:
        """二进制流处理器"""
        def __init__(self, stream):
            self.stream = stream
            self.buffer = bytearray()
            self.position = 0
        
        def read_bytes(self, size):
            """读取指定字节数"""
            while len(self.buffer) < size:
                chunk = self.stream.read(4096)
                if not chunk:
                    break
                self.buffer.extend(chunk)
            
            if len(self.buffer) < size:
                raise EOFError("无法读取足够的数据")
            
            data = self.buffer[:size]
            self.buffer = self.buffer[size:]
            self.position += size
            return data
        
        def read_until(self, delimiter):
            """读取直到遇到分隔符"""
            while delimiter not in self.buffer:
                chunk = self.stream.read(4096)
                if not chunk:
                    break
                self.buffer.extend(chunk)
            
            if delimiter not in self.buffer:
                raise EOFError("未找到分隔符")
            
            index = self.buffer.index(delimiter)
            data = self.buffer[:index]
            self.buffer = self.buffer[index + len(delimiter):]
            self.position += index + len(delimiter)
            return data
        
        def seek(self, position):
            """定位到指定位置"""
            if position < self.position:
                raise ValueError("不支持向后定位")
            
            # 跳过不需要的数据
            skip_bytes = position - self.position
            if skip_bytes > 0:
                self.read_bytes(skip_bytes)
        
        def get_position(self):
            """获取当前位置"""
            return self.position
    
    # 使用示例
    # 创建测试数据
    test_data = b'START' + b'\x00\x01\x02\x03' * 1000 + b'END'
    stream = io.BytesIO(test_data)
    
    processor = BinaryStreamProcessor(stream)
    
    # 读取起始标记
    start_marker = processor.read_bytes(5)
    print(f"起始标记: {start_marker}")
    
    # 读取直到结束标记
    try:
        content = processor.read_until(b'END')
        print(f"内容长度: {len(content)} 字节")
        print(f"最后几个字节: {content[-10:]}")
    except EOFError as e:
        print(f"错误: {e}")
    
    print(f"最终位置: {processor.get_position()}")

binary_stream_processing()

四、文件格式处理实战

4.1 PNG文件头解析

def png_file_analysis():
    """PNG文件格式解析"""
    # PNG文件签名
    PNG_SIGNATURE = b'\x89PNG\r\n\x1a\n'
    
    def parse_png_header(file_path):
        """解析PNG文件头"""
        with open(file_path, 'rb') as f:
            # 检查签名
            signature = f.read(8)
            if signature != PNG_SIGNATURE:
                raise ValueError("不是有效的PNG文件")
            
            print("有效的PNG文件")
            
            # 读取第一个数据块 (IHDR)
            while True:
                # 读取数据块长度
                length_data = f.read(4)
                if not length_data:
                    break
                
                chunk_length = struct.unpack('>I', length_data)[0]
                chunk_type = f.read(4)
                chunk_data = f.read(chunk_length)
                crc = f.read(4)
                
                print(f"\n数据块类型: {chunk_type.decode('ascii')}")
                print(f"数据块长度: {chunk_length}")
                
                if chunk_type == b'IHDR':
                    # 解析IHDR数据块
                    width, height, bit_depth, color_type, compression, filter_method, interlace_method = \
                        struct.unpack('>IIBBBBB', chunk_data)
                    
                    print("=== IHDR 数据块 ===")
                    print(f"宽度: {width} 像素")
                    print(f"高度: {height} 像素")
                    print(f"位深度: {bit_depth}")
                    print(f"颜色类型: {color_type}")
                    print(f"压缩方法: {compression}")
                    print(f"滤波方法: {filter_method}")
                    print(f"隔行扫描方法: {interlace_method}")
                    
                    break
    
    # 使用示例 (需要实际PNG文件)
    try:
        parse_png_header('example.png')
    except FileNotFoundError:
        print("示例PNG文件不存在,创建测试文件...")
        # 创建最小PNG文件用于测试
        with open('test.png', 'wb') as f:
            # PNG签名
            f.write(b'\x89PNG\r\n\x1a\n')
            # IHDR数据块
            f.write(struct.pack('>I', 13))  # 数据长度
            f.write(b'IHDR')  # 数据块类型
            # IHDR数据: 宽100, 高50, 8位深度, 2颜色类型, 无压缩, 无滤波, 无隔行
            f.write(struct.pack('>IIBBBBB', 100, 50, 8, 2, 0, 0, 0))
            # CRC (简化处理)
            f.write(b'\x00\x00\x00\x00')
        
        parse_png_header('test.png')

png_file_analysis()

4.2 自定义二进制格式处理

def custom_binary_format():
    """自定义二进制格式处理"""
    # 定义自定义二进制格式
    class CustomBinaryFormat:
        """自定义二进制文件格式"""
        MAGIC_NUMBER = b'CBF\x01'  # 魔术数字 + 版本号
        HEADER_FORMAT = '>I I I'   # 文件大小, 记录数, 数据偏移
        
        def __init__(self):
            self.records = []
        
        def add_record(self, record_type, data):
            """添加记录"""
            self.records.append((record_type, data))
        
        def write_file(self, filename):
            """写入文件"""
            with open(filename, 'wb') as f:
                # 写入魔术数字
                f.write(self.MAGIC_NUMBER)
                
                # 预留头部空间
                header_pos = f.tell()
                f.write(b'\x00' * struct.calcsize(self.HEADER_FORMAT))
                
                # 写入记录
                record_offsets = []
                for record_type, data in self.records:
                    record_offsets.append(f.tell())
                    
                    # 记录头部: 类型 + 长度
                    record_header = struct.pack('>H I', record_type, len(data))
                    f.write(record_header)
                    f.write(data)
                
                # 写入记录索引
                index_offset = f.tell()
                for offset in record_offsets:
                    f.write(struct.pack('>Q', offset))  # 8字节偏移量
                
                # 回到头部写入完整信息
                f.seek(header_pos)
                file_size = f.tell()
                header_data = struct.pack(self.HEADER_FORMAT, 
                                         file_size, 
                                         len(self.records),
                                         index_offset)
                f.write(header_data)
        
        def read_file(self, filename):
            """读取文件"""
            with open(filename, 'rb') as f:
                # 检查魔术数字
                magic = f.read(4)
                if magic != self.MAGIC_NUMBER:
                    raise ValueError("不是有效的自定义格式文件")
                
                # 读取头部
                header_data = f.read(struct.calcsize(self.HEADER_FORMAT))
                file_size, num_records, index_offset = \
                    struct.unpack(self.HEADER_FORMAT, header_data)
                
                print(f"文件大小: {file_size}")
                print(f"记录数量: {num_records}")
                
                # 读取记录索引
                f.seek(index_offset)
                record_offsets = []
                for _ in range(num_records):
                    offset_data = f.read(8)
                    offset = struct.unpack('>Q', offset_data)[0]
                    record_offsets.append(offset)
                
                # 读取记录
                self.records = []
                for offset in record_offsets:
                    f.seek(offset)
                    
                    # 读取记录头部
                    header_data = f.read(6)  # 2字节类型 + 4字节长度
                    record_type, data_length = struct.unpack('>H I', header_data)
                    
                    # 读取数据
                    data = f.read(data_length)
                    
                    self.records.append((record_type, data))
                    print(f"记录 {record_type}: {len(data)} 字节")
    
    # 使用示例
    format_handler = CustomBinaryFormat()
    
    # 添加测试记录
    format_handler.add_record(1, b'Hello, World!')
    format_handler.add_record(2, b'Binary Format Test')
    format_handler.add_record(3, bytes([i for i in range(100)]))
    
    # 写入文件
    format_handler.write_file('custom_format.cbf')
    print("自定义格式文件已写入")
    
    # 读取文件
    format_handler.read_file('custom_format.cbf')
    
    # 显示记录内容
    for record_type, data in format_handler.records:
        try:
            text_content = data.decode('utf-8')
            print(f"记录 {record_type} (文本): {text_content}")
        except UnicodeDecodeError:
            print(f"记录 {record_type} (二进制): {data[:20]}...")

custom_binary_format()

五、高性能二进制处理

5.1 使用numpy进行高性能二进制处理

import numpy as np

def numpy_binary_processing():
    """使用numpy进行高性能二进制处理"""
    # 创建大型数值数据集
    large_data = np.random.rand(1000000).astype(np.float32)  # 100万个浮点数
    print(f"原始数据形状: {large_data.shape}")
    print(f"原始数据类型: {large_data.dtype}")
    print(f"内存占用: {large_data.nbytes / 1024 / 1024:.2f} MB")
    
    # 保存为二进制文件
    large_data.tofile('large_binary_data.bin')
    print("数据已保存到文件")
    
    # 从文件加载
    loaded_data = np.fromfile('large_binary_data.bin', dtype=np.float32)
    print(f"加载数据形状: {loaded_data.shape}")
    
    # 验证数据完整性
    print(f"数据一致性: {np.array_equal(large_data, loaded_data)}")
    
    # 内存映射文件处理
    mmap_data = np.memmap('large_binary_data.bin', 
                         dtype=np.float32, 
                         mode='r', 
                         shape=large_data.shape)
    
    print("内存映射访问:")
    print(f"前10个元素: {mmap_data[:10]}")
    print(f"平均值: {mmap_data.mean():.6f}")
    print(f"标准差: {mmap_data.std():.6f}")
    
    # 处理大型二进制文件
    def process_large_file(filename, dtype, chunk_size=100000):
        """分块处理大型二进制文件"""
        total_values = 0
        total_sum = 0.0
        min_val = float('inf')
        max_val = float('-inf')
        
        with open(filename, 'rb') as f:
            while True:
                # 读取块数据
                chunk = np.fromfile(f, dtype=dtype, count=chunk_size)
                if len(chunk) == 0:
                    break
                
                # 处理统计信息
                total_values += len(chunk)
                total_sum += chunk.sum()
                min_val = min(min_val, chunk.min())
                max_val = max(max_val, chunk.max())
        
        return {
            'count': total_values,
            'mean': total_sum / total_values if total_values > 0 else 0,
            'min': min_val,
            'max': max_val
        }
    
    # 处理大型文件
    stats = process_large_file('large_binary_data.bin', np.float32)
    print("\n文件统计信息:")
    print(f"数据点数: {stats['count']:,}")
    print(f"平均值: {stats['mean']:.6f}")
    print(f"最小值: {stats['min']:.6f}")
    print(f"最大值: {stats['max']:.6f}")
    
    # 结构化数组处理
    structured_dtype = np.dtype([
        ('id', 'i4'),
        ('value', 'f8'),
        ('timestamp', 'i8'),
        ('flags', 'u1')
    ])
    
    # 创建结构化数据
    structured_data = np.array([
        (1, 3.14, 1640995200, 0x01),
        (2, 2.718, 1641081600, 0x02),
        (3, 1.618, 1641168000, 0x03)
    ], dtype=structured_dtype)
    
    # 保存结构化数据
    structured_data.tofile('structured_data.bin')
    
    # 加载结构化数据
    loaded_structured = np.fromfile('structured_data.bin', dtype=structured_dtype)
    print("\n结构化数据:")
    for record in loaded_structured:
        print(f"ID: {record['id']}, 值: {record['value']:.3f}, "
              f"时间戳: {record['timestamp']}, 标志: 0x{record['flags']:02x}")

numpy_binary_processing()

5.2 并行二进制处理

def parallel_binary_processing():
    """并行二进制数据处理"""
    import concurrent.futures
    import multiprocessing
    
    # 创建大型测试文件
    def create_test_file(filename, size_mb=10):
        """创建大型测试文件"""
        chunk_size = 1024 * 1024  # 1MB
        with open(filename, 'wb') as f:
            for i in range(size_mb):
                # 生成1MB的随机数据
                data = os.urandom(chunk_size)
                f.write(data)
                print(f"已写入 {(i+1)} MB")
    
    create_test_file('large_test_file.bin', 10)
    print("测试文件创建完成")
    
    # 并行处理函数
    def process_chunk(args):
        """处理数据块"""
        filename, start, size = args
        with open(filename, 'rb') as f:
            f.seek(start)
            data = f.read(size)
        
        # 简单的处理:计算校验和
        checksum = 0
        for byte in data:
            checksum = (checksum + byte) % 256
        
        return {
            'start': start,
            'size': size,
            'checksum': checksum,
            'processed': len(data)
        }
    
    # 并行处理
    def parallel_process_file(filename, chunk_size_mb=1):
        """并行处理文件"""
        file_size = os.path.getsize(filename)
        chunk_size = chunk_size_mb * 1024 * 1024
        
        # 准备任务
        tasks = []
        for start in range(0, file_size, chunk_size):
            size = min(chunk_size, file_size - start)
            tasks.append((filename, start, size))
        
        print(f"总共 {len(tasks)} 个任务")
        
        # 使用进程池
        results = []
        with concurrent.futures.ProcessPoolExecutor() as executor:
            future_to_task = {
                executor.submit(process_chunk, task): task 
                for task in tasks
            }
            
            for future in concurrent.futures.as_completed(future_to_task):
                task = future_to_task[future]
                try:
                    result = future.result()
                    results.append(result)
                    print(f"完成块 {task[1]}-{task[1]+task[2]},校验和: {result['checksum']}")
                except Exception as e:
                    print(f"处理块 {task[1]} 时出错: {e}")
        
        # 汇总结果
        total_processed = sum(r['processed'] for r in results)
        overall_checksum = sum(r['checksum'] for r in results) % 256
        
        return {
            'total_processed': total_processed,
            'overall_checksum': overall_checksum,
            'chunk_count': len(results)
        }
    
    # 执行并行处理
    result = parallel_process_file('large_test_file.bin', 2)
    print("\n并行处理结果:")
    print(f"处理总字节数: {result['total_processed']:,}")
    print(f"总校验和: {result['overall_checksum']}")
    print(f"处理块数: {result['chunk_count']}")

parallel_binary_processing()

六、网络二进制数据处理

6.1 Socket二进制数据传输

def socket_binary_transfer():
    """Socket二进制数据传输"""
    import socket
    import threading
    
    # 简单的二进制协议
    class BinaryProtocol:
        def __init__(self):
            self.buffer = bytearray()
        
        def encode_message(self, data):
            """编码消息: 长度 + 数据"""
            length = len(data)
            return struct.pack('>I', length) + data
        
        def decode_messages(self, data):
            """解码消息"""
            self.buffer.extend(data)
            messages = []
            
            while len(self.buffer) >= 4:
                # 读取消息长度
                length = struct.unpack('>I', self.buffer[:4])[0]
                
                # 检查是否有完整消息
                if len(self.buffer) < 4 + length:
                    break
                
                # 提取消息数据
                message_data = self.buffer[4:4+length]
                messages.append(message_data)
                
                # 从缓冲区移除已处理的消息
                self.buffer = self.buffer[4+length:]
            
            return messages
    
    # 服务器端
    def server_thread():
        """服务器线程"""
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.bind(('localhost', 9999))
        server_socket.listen(1)
        
        print("服务器启动,等待连接...")
        conn, addr = server_socket.accept()
        print(f"连接来自: {addr}")
        
        protocol = BinaryProtocol()
        
        try:
            while True:
                data = conn.recv(4096)
                if not data:
                    break
                
                messages = protocol.decode_messages(data)
                for msg in messages:
                    try:
                        text = msg.decode('utf-8')
                        print(f"收到消息: {text}")
                    except UnicodeDecodeError:
                        print(f"收到二进制数据: {msg[:20]}...")
                    
                    # 发送响应
                    response = f"已接收 {len(msg)} 字节".encode('utf-8')
                    conn.send(protocol.encode_message(response))
        
        finally:
            conn.close()
            server_socket.close()
    
    # 客户端
    def client_example():
        """客户端示例"""
        # 等待服务器启动
        import time
        time.sleep(0.1)
        
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client_socket.connect(('localhost', 9999))
        
        protocol = BinaryProtocol()
        
        # 发送文本消息
        messages = [
            "Hello, Server!",
            "这是文本消息",
            "结束通信"
        ]
        
        for msg in messages:
            encoded_msg = protocol.encode_message(msg.encode('utf-8'))
            client_socket.send(encoded_msg)
            
            # 接收响应
            response_data = client_socket.recv(4096)
            responses = protocol.decode_messages(response_data)
            for resp in responses:
                print(f"服务器响应: {resp.decode('utf-8')}")
        
        # 发送二进制数据
        binary_data = bytes([i for i in range(100)])
        client_socket.send(protocol.encode_message(binary_data))
        
        response_data = client_socket.recv(4096)
        responses = protocol.decode_messages(response_data)
        for resp in responses:
            print(f"二进制响应: {resp.decode('utf-8')}")
        
        client_socket.close()
    
    # 启动服务器线程
    server = threading.Thread(target=server_thread)
    server.daemon = True
    server.start()
    
    # 运行客户端
    client_example()
    server.join()

socket_binary_transfer()

6.2 HTTP二进制数据传输

def http_binary_transfer():
    """HTTP二进制数据传输"""
    import requests
    from http.server import HTTPServer, BaseHTTPRequestHandler
    import threading
    
    # HTTP请求处理器
    class BinaryHandler(BaseHTTPRequestHandler):
        def do_POST(self):
            """处理二进制POST请求"""
            content_length = int(self.headers.get('Content-Length', 0))
            binary_data = self.rfile.read(content_length)
            
            print(f"收到 {len(binary_data)} 字节数据")
            
            # 处理数据 (示例: 计算SHA256)
            import hashlib
            sha256_hash = hashlib.sha256(binary_data).hexdigest()
            
            # 发送响应
            self.send_response(200)
            self.send_header('Content-Type', 'application/octet-stream')
            self.end_headers()
            
            response_data = f"SHA256: {sha256_hash}".encode('utf-8')
            self.wfile.write(response_data)
        
        def do_GET(self):
            """提供二进制文件下载"""
            if self.path == '/download':
                # 生成测试二进制数据
                test_data = bytes([i % 256 for i in range(1024)])
                
                self.send_response(200)
                self.send_header('Content-Type', 'application/octet-stream')
                self.send_header('Content-Disposition', 'attachment; filename="test.bin"')
                self.send_header('Content-Length', str(len(test_data)))
                self.end_headers()
                
                self.wfile.write(test_data)
            else:
                self.send_error(404)
    
    def start_server():
        """启动HTTP服务器"""
        server = HTTPServer(('localhost', 8000), BinaryHandler)
        print("HTTP服务器启动在端口 8000")
        server.serve_forever()
    
    # 启动服务器线程
    server_thread = threading.Thread(target=start_server)
    server_thread.daemon = True
    server_thread.start()
    
    # 等待服务器启动
    import time
    time.sleep(0.1)
    
    # 客户端测试
    def test_client():
        """测试HTTP客户端"""
        # 上传二进制数据
        test_data = b'\x00\x01\x02\x03\x04\x05' * 100  # 600字节测试数据
        
        response = requests.post('http://localhost:8000/', data=test_data)
        print(f"上传响应: {response.text}")
        
        # 下载二进制文件
        response = requests.get('http://localhost:8000/download')
        if response.status_code == 200:
            with open('downloaded.bin', 'wb') as f:
                f.write(response.content)
            print(f"下载完成: {len(response.content)} 字节")
            
            # 验证下载文件
            with open('downloaded.bin', 'rb') as f:
                downloaded_data = f.read()
                print(f"验证下载: {downloaded_data[:10]}...")
        else:
            print(f"下载失败: {response.status_code}")
    
    # 运行测试
    test_client()

http_binary_transfer()

七、错误处理与最佳实践

7.1 二进制数据错误处理

def binary_error_handling():
    """二进制数据错误处理"""
    class SafeBinaryHandler:
        """安全的二进制数据处理"""
        def __init__(self):
            self.buffer = bytearray()
        
        def read_binary_file(self, filename):
            """安全读取二进制文件"""
            try:
                with open(filename, 'rb') as f:
                    return f.read()
            except FileNotFoundError:
                print(f"错误: 文件 {filename} 不存在")
                return None
            except PermissionError:
                print(f"错误: 没有权限读取 {filename}")
                return None
            except Exception as e:
                print(f"读取文件时发生未知错误: {e}")
                return None
        
        def parse_binary_data(self, data, format_string):
            """安全解析二进制数据"""
            try:
                return struct.unpack(format_string, data)
            except struct.error as e:
                print(f"解析错误: {e}")
                print(f"数据长度: {len(data)},需要: {struct.calcsize(format_string)}")
                return None
        
        def validate_data_size(self, data, expected_size):
            """验证数据大小"""
            if len(data) != expected_size:
                raise ValueError(f"数据大小不匹配: 期望 {expected_size}, 实际 {len(data)}")
            return True
        
        def safe_file_operations(self, operations):
            """安全文件操作"""
            results = []
            for op in operations:
                try:
                    result = op()
                    results.append(result)
                except Exception as e:
                    print(f"操作失败: {e}")
                    results.append(None)
            return results
    
    # 使用示例
    handler = SafeBinaryHandler()
    
    # 安全读取文件
    data = handler.read_binary_file('example.bin')
    if data is None:
        print("创建测试文件...")
        data = b'\x00\x01\x02\x03\x04\x05'
        with open('example.bin', 'wb') as f:
            f.write(data)
        data = handler.read_binary_file('example.bin')
    
    # 安全解析
    if data:
        # 正确的格式
        parsed = handler.parse_binary_data(data, '6B')  # 6个无符号字节
        print(f"正确解析: {parsed}")
        
        # 错误的格式
        parsed = handler.parse_binary_data(data, 'I')  # 需要4字节
        if parsed is None:
            print("解析失败已正确处理")
        
        # 验证数据大小
        try:
            handler.validate_data_size(data, 6)
            print("数据大小验证通过")
        except ValueError as e:
            print(f"验证错误: {e}")
    
    # 批量安全操作
    operations = [
        lambda: handler.read_binary_file('nonexistent.bin'),
        lambda: handler.parse_binary_data(b'\x00', 'I'),
        lambda: handler.validate_data_size(b'test', 10)
    ]
    
    results = handler.safe_file_operations(operations)
    print(f"批量操作结果: {results}")

binary_error_handling()

7.2 二进制数据处理最佳实践

def binary_best_practices():
    """二进制数据处理最佳实践"""
    # 1. 使用上下文管理器
    def safe_binary_io(filename, mode='rb'):
        """安全的二进制文件操作"""
        try:
            with open(filename, mode) as f:
                yield f
        except Exception as e:
            print(f"文件操作错误: {e}")
            raise
    
    # 2. 数据验证函数
    def validate_binary_data(data, expected_properties):
        """验证二进制数据属性"""
        if 'size' in expected_properties:
            if len(data) != expected_properties['size']:
                raise ValueError(f"数据大小不匹配")
        
        if 'signature' in expected_properties:
            signature = expected_properties['signature']
            if data[:len(signature)] != signature:
                raise ValueError("数据签名不匹配")
        
        return True
    
    # 3. 内存高效处理
    def process_large_binary(filename, chunk_size=8192, processor=None):
        """处理大型二进制文件的内存高效方法"""
        if processor is None:
            processor = lambda chunk: chunk  # 默认处理器
        
        results = []
        with open(filename, 'rb') as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                results.append(processor(chunk))
        return results
    
    # 4. 类型安全的二进制操作
    class TypedBinaryData:
        """类型安全的二进制数据包装器"""
        def __init__(self, data, dtype='B'):
            if not isinstance(data, (bytes, bytearray)):
                raise TypeError("必须是字节数据")
            self.data = data
            self.dtype = dtype
        
        def as_int(self):
            """转换为整数"""
            try:
                return struct.unpack(self.dtype, self.data)[0]
            except struct.error:
                raise ValueError("无法转换为指定类型")
        
        def as_bytes(self):
            """获取字节数据"""
            return self.data
        
        def validate_size(self):
            """验证数据大小"""
            expected_size = struct.calcsize(self.dtype)
            if len(self.data) != expected_size:
                raise ValueError(f"数据大小应为 {expected_size} 字节")
            return True
    
    # 使用示例
    print("最佳实践示例:")
    
    # 使用上下文管理器
    try:
        with safe_binary_io('example.bin', 'wb') as f:
            f.write(b'test data')
        print("1. 上下文管理器使用成功")
    except Exception as e:
        print(f"1. 错误: {e}")
    
    # 数据验证
    try:
        validate_binary_data(b'test', {'size': 4, 'signature': b'test'})
        print("2. 数据验证成功")
    except ValueError as e:
        print(f"2. 验证错误: {e}")
    
    # 内存高效处理
    def simple_processor(chunk):
        return len(chunk)  # 返回块长度
    
    chunk_sizes = process_large_binary('example.bin', processor=simple_processor)
    print(f"3. 块处理结果: {chunk_sizes}")
    
    # 类型安全操作
    try:
        typed_data = TypedBinaryData(b'\x01\x00\x00\x00', 'I')  # 小端序整数1
        typed_data.validate_size()
        value = typed_data.as_int()
        print(f"4. 类型安全值: {value}")
    except Exception as e:
        print(f"4. 类型安全错误: {e}")

binary_best_practices()

八、总结:二进制数据处理技术全景

8.1 技术选型矩阵

场景推荐方案优势注意事项
​基础操作​直接文件IO简单直接内存使用
​结构化数据​struct模块类型安全格式定义
​大型文件​内存映射高效随机访问系统限制
​高性能计算​numpy向量化操作内存占用
​网络传输​自定义协议灵活可控协议设计
​错误处理​异常处理健壮性性能开销

8.2 核心原则总结

​理解数据格式​​:

​选择合适工具​​:

​性能优化​​:

​错误处理​​:

​内存管理​​:

​跨平台考虑​​:

8.3 实战建议

def professional_binary_processing():
    """
    专业二进制处理模板
    
    遵循最佳实践:
    1. 使用上下文管理器
    2. 验证输入数据
    3. 错误处理
    4. 资源清理
    """
    def process_binary_file(input_path, output_path=None, processor=None):
        """处理二进制文件的完整流程"""
        # 输入验证
        if not os.path.exists(input_path):
            raise FileNotFoundError(f"输入文件不存在: {input_path}")
        
        if processor is None:
            processor = lambda x: x  # 默认处理器
        
        try:
            # 读取数据
            with open(input_path, 'rb') as f_in:
                input_data = f_in.read()
            
            # 验证数据
            if not input_data:
                raise ValueError("输入文件为空")
            
            # 处理数据
            processed_data = processor(input_data)
            
            # 可选输出
            if output_path:
                with open(output_path, 'wb') as f_out:
                    f_out.write(processed_data)
                print(f"结果已保存到: {output_path}")
            
            return processed_data
            
        except Exception as e:
            print(f"处理过程中发生错误: {e}")
            # 可以考虑记录日志或重试
            raise
    
    # 示例处理器
    def example_processor(data):
        """示例处理函数: 简单的XOR加密"""
        key = 0x55
        return bytes(b ^ key for b in data)
    
    # 使用示例
    try:
        # 创建测试文件
        test_data = b'\x00\x01\x02\x03\x04\x05'
        with open('test_input.bin', 'wb') as f:
            f.write(test_data)
        
        # 处理文件
        result = process_binary_file(
            'test_input.bin',
            'test_output.bin',
            example_processor
        )
        
        print(f"处理完成,结果长度: {len(result)} 字节")
        
    except Exception as e:
        print(f"操作失败: {e}")

professional_binary_processing()

通过本文的全面探讨,我们深入了解了Python二进制数据处理的完整技术体系。从基础文件操作到高级内存映射,从简单字节处理到复杂协议实现,我们覆盖了二进制数据处理领域的核心知识点。

二进制数据处理是Python系统开发中的基础且重要的技能,掌握这些技术将大大提高您的程序性能和处理能力。无论是开发文件格式解析器、实现网络协议,还是进行高性能计算,这些技术都能为您提供强大的支持。

记住,优秀的二进制数据处理实现不仅关注功能正确性,更注重性能、健壮性和可维护性。始终根据具体需求选择最适合的技术方案,在功能与复杂度之间找到最佳平衡点。

到此这篇关于一文详解Python如何处理二进制数据的文章就介绍到这了,更多相关Python处理二进制数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文