python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python读写文本数据

Python实现文本数据读写方法的完全指南

作者:Python×CATIA工业智造

在当今数据驱动的世界中,文本数据处理是每个Python开发者必须掌握的核心技能,本文将深入解析Python文本读写的完整技术体系,有需要的小伙伴可以了解下

引言:文本数据处理的现代挑战与重要性

在当今数据驱动的世界中,文本数据处理是每个Python开发者必须掌握的核心技能。根据2024年Python开发者调查报告:

Python提供了强大的文本处理能力,但许多开发者未能充分利用其全部潜力。本文将深入解析Python文本读写的完整技术体系,结合Python Cookbook精髓,并拓展编码处理、大文件操作、正则表达式、性能优化等工程级应用场景。

一、基础文本读写操作

1.1 文件操作基础模式

# 基本文件读写操作
def basic_file_operations():
    """基础文件操作示例"""
    # 写入文本数据
    with open('example.txt', 'w', encoding='utf-8') as f:
        f.write("Hello, World!\n")
        f.write("这是第二行文本\n")
        f.write("Third line with numbers: 123\n")
    
    # 读取整个文件
    with open('example.txt', 'r', encoding='utf-8') as f:
        content = f.read()
        print("文件全部内容:")
        print(content)
    
    # 逐行读取
    with open('example.txt', 'r', encoding='utf-8') as f:
        print("\n逐行读取:")
        for i, line in enumerate(f, 1):
            print(f"行 {i}: {line.strip()}")
    
    # 读取所有行到列表
    with open('example.txt', 'r', encoding='utf-8') as f:
        lines = f.readlines()
        print(f"\n所有行列表: {lines}")

# 执行示例
basic_file_operations()

1.2 文件模式详解与应用场景

Python支持多种文件打开模式,每种模式适用于不同场景:

模式描述适用场景
'r'只读模式读取现有文件,默认模式
'w'写入模式创建新文件或覆盖现有文件
'a'追加模式在文件末尾添加内容
'x'独占创建创建新文件,如果文件已存在则失败
'b'二进制模式处理非文本文件(如图像、音频)
't'文本模式处理文本文件,默认模式
'+'读写模式允许读取和写入操作
def advanced_file_modes():
    """高级文件模式使用示例"""
    # 读写模式(r+)
    with open('data.txt', 'w+', encoding='utf-8') as f:
        f.write("初始内容\n")
        f.seek(0)  # 回到文件开头
        content = f.read()
        print("读写模式内容:", content)
    
    # 追加读写模式(a+)
    with open('data.txt', 'a+', encoding='utf-8') as f:
        f.write("追加的内容\n")
        f.seek(0)
        content = f.read()
        print("追加后内容:", content)
    
    # 二进制读写
    with open('binary_data.bin', 'wb') as f:
        f.write(b'\x00\x01\x02\x03\x04\x05')
    
    with open('binary_data.bin', 'rb') as f:
        binary_content = f.read()
        print("二进制内容:", binary_content)

# 执行示例
advanced_file_modes()

二、编码处理与字符集问题

2.1 正确处理文本编码

def encoding_handling():
    """文本编码处理"""
    texts = [
        "Hello World",
        "你好世界",
        "こんにちは世界",
        "안녕하세요 세계"
    ]
    
    # 不同编码写入
    encodings = ['utf-8', 'gbk', 'shift_jis', 'euc-kr']
    
    for i, (text, encoding) in enumerate(zip(texts, encodings)):
        try:
            with open(f'file_{i}.txt', 'w', encoding=encoding) as f:
                f.write(text)
            print(f"成功写入 {encoding} 编码文件")
        except UnicodeEncodeError as e:
            print(f"编码错误: {encoding} - {e}")
    
    # 自动检测编码读取
    import chardet
    
    for i in range(len(texts)):
        try:
            with open(f'file_{i}.txt', 'rb') as f:
                raw_data = f.read()
                detected = chardet.detect(raw_data)
                encoding = detected['encoding']
                confidence = detected['confidence']
                
                print(f"检测到编码: {encoding} (置信度: {confidence:.2f})")
                content = raw_data.decode(encoding)
                print(f"文件内容: {content}")
                
        except FileNotFoundError:
            print(f"文件 file_{i}.txt 不存在")
        except UnicodeDecodeError as e:
            print(f"解码错误: {e}")

encoding_handling()

2.2 编码转换与规范化

def encoding_conversion():
    """编码转换处理"""
    # 创建测试文件
    text = "中文测试 English Test 日本語テスト"
    
    # 以不同编码保存
    with open('text_gbk.txt', 'w', encoding='gbk') as f:
        f.write(text)
    
    with open('text_utf8.txt', 'w', encoding='utf-8') as f:
        f.write(text)
    
    # 编码转换函数
    def convert_encoding(input_file, output_file, from_encoding, to_encoding):
        """转换文件编码"""
        try:
            with open(input_file, 'r', encoding=from_encoding) as f_in:
                content = f_in.read()
            
            with open(output_file, 'w', encoding=to_encoding) as f_out:
                f_out.write(content)
            
            print(f"成功转换 {input_file} 从 {from_encoding} 到 {to_encoding}")
            
        except UnicodeDecodeError:
            print(f"解码失败: {input_file} 可能不是 {from_encoding} 编码")
        except UnicodeEncodeError:
            print(f"编码失败: 无法用 {to_encoding} 编码内容")
    
    # 执行转换
    convert_encoding('text_gbk.txt', 'text_utf8_from_gbk.txt', 'gbk', 'utf-8')
    convert_encoding('text_utf8.txt', 'text_gbk_from_utf8.txt', 'utf-8', 'gbk')
    
    # Unicode规范化
    import unicodedata
    
    text_with_unicode = "café naïve niña"
    normalized = unicodedata.normalize('NFC', text_with_unicode)
    print(f"原始文本: {text_with_unicode}")
    print(f"规范化后: {normalized}")

encoding_conversion()

三、高级文件操作技巧

3.1 上下文管理器与异常处理

class SafeFileHandler:
    """安全的文件处理器,带异常处理"""
    def __init__(self, filename, mode='r', encoding='utf-8'):
        self.filename = filename
        self.mode = mode
        self.encoding = encoding
        self.file = None
    
    def __enter__(self):
        try:
            self.file = open(self.filename, self.mode, encoding=self.encoding)
            return self.file
        except FileNotFoundError:
            print(f"错误: 文件 {self.filename} 不存在")
            raise
        except PermissionError:
            print(f"错误: 没有权限访问 {self.filename}")
            raise
        except Exception as e:
            print(f"打开文件时发生未知错误: {e}")
            raise
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.file:
            self.file.close()
        if exc_type:
            print(f"文件操作发生错误: {exc_val}")
        return False  # 不抑制异常

# 使用示例
def safe_file_operations():
    """安全的文件操作示例"""
    try:
        with SafeFileHandler('example.txt', 'r') as f:
            content = f.read()
            print("安全读取的内容:", content)
    except Exception as e:
        print(f"操作失败: {e}")
    
    # 写入操作
    try:
        with SafeFileHandler('output.txt', 'w') as f:
            f.write("这是安全写入的内容\n")
    except Exception as e:
        print(f"写入失败: {e}")

safe_file_operations()

3.2 文件路径处理

from pathlib import Path
import os

def path_operations():
    """现代文件路径处理"""
    # 使用pathlib处理路径
    current_dir = Path.cwd()
    print(f"当前目录: {current_dir}")
    
    # 创建文件路径
    file_path = current_dir / 'data' / 'files' / 'example.txt'
    print(f"文件路径: {file_path}")
    
    # 创建目录
    file_path.parent.mkdir(parents=True, exist_ok=True)
    
    # 写入文件
    file_path.write_text("这是使用pathlib写入的内容\n", encoding='utf-8')
    
    # 读取文件
    content = file_path.read_text(encoding='utf-8')
    print(f"文件内容: {content}")
    
    # 文件信息
    print(f"文件存在: {file_path.exists()}")
    print(f"是文件: {file_path.is_file()}")
    print(f"文件大小: {file_path.stat().st_size} 字节")
    
    # 遍历目录
    data_dir = current_dir / 'data'
    print("目录内容:")
    for item in data_dir.iterdir():
        print(f"  {item.name} - {'文件' if item.is_file() else '目录'}")
    
    # 查找文件
    print("查找txt文件:")
    for txt_file in data_dir.rglob('*.txt'):
        print(f"  找到: {txt_file}")

path_operations()

四、大文件处理与内存优化

4.1 流式处理大型文件

def process_large_file(filename, chunk_size=1024 * 1024):  # 1MB chunks
    """处理大文件的迭代器方法"""
    with open(filename, 'r', encoding='utf-8') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            # 处理块数据
            yield from process_chunk(chunk)

def process_chunk(chunk):
    """处理数据块的生成器"""
    lines = chunk.split('\n')
    for line in lines:
        if line.strip():  # 跳过空行
            yield line.strip()

# 使用示例
def large_file_example():
    """大文件处理示例"""
    # 创建一个大文件示例
    with open('large_file.txt', 'w', encoding='utf-8') as f:
        for i in range(100000):
            f.write(f"这是第 {i} 行数据,包含一些文本内容用于测试\n")
    
    # 处理大文件
    line_count = 0
    for line in process_large_file('large_file.txt'):
        line_count += 1
        if line_count % 10000 == 0:
            print(f"已处理 {line_count} 行")
    
    print(f"总共处理了 {line_count} 行")

large_file_example()

4.2 内存映射文件处理

import mmap

def memory_mapped_operations():
    """内存映射文件处理大型文本"""
    # 创建大型文本文件
    with open('large_text.txt', 'w', encoding='utf-8') as f:
        for i in range(100000):
            f.write(f"这是第 {i} 行,包含一些文本内容用于测试内存映射文件操作\n")
    
    # 使用内存映射读取
    with open('large_text.txt', 'r+', encoding='utf-8') as f:
        # 创建内存映射
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            # 搜索特定内容
            position = mm.find(b"第 50000 行")
            if position != -1:
                mm.seek(position)
                line = mm.readline().decode('utf-8')
                print(f"找到的行: {line}")
            
            # 统计行数
            line_count = 0
            mm.seek(0)
            while True:
                line = mm.readline()
                if not line:
                    break
                line_count += 1
            
            print(f"文件总行数: {line_count}")
            
            # 迭代处理每一行
            mm.seek(0)
            for i in range(5):  # 只显示前5行
                line = mm.readline().decode('utf-8').strip()
                print(f"行 {i+1}: {line}")

memory_mapped_operations()

五、结构化文本数据处理

5.1 CSV文件处理

import csv
from collections import namedtuple

def csv_operations():
    """CSV文件读写操作"""
    # 写入CSV文件
    with open('data.csv', 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['姓名', '年龄', '城市'])
        writer.writerow(['张三', 25, '北京'])
        writer.writerow(['李四', 30, '上海'])
        writer.writerow(['王五', 28, '广州'])
    
    # 读取CSV文件
    with open('data.csv', 'r', newline='', encoding='utf-8') as f:
        reader = csv.reader(f)
        header = next(reader)
        print("CSV头部:", header)
        for row in reader:
            print(f"行数据: {row}")
    
    # 使用字典方式读写CSV
    with open('data_dict.csv', 'w', newline='', encoding='utf-8') as f:
        fieldnames = ['name', 'age', 'city']
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerow({'name': '张三', 'age': 25, 'city': '北京'})
        writer.writerow({'name': '李四', 'age': 30, 'city': '上海'})
    
    # 读取为字典
    with open('data_dict.csv', 'r', newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            print(f"字典行: {row}")

# 执行示例
csv_operations()

5.2 JSON数据处理

import json

def json_operations():
    """JSON文件读写操作"""
    data = {
        "users": [
            {"name": "张三", "age": 25, "hobbies": ["阅读", "游泳"]},
            {"name": "李四", "age": 30, "hobbies": ["音乐", "旅行"]},
            {"name": "王五", "age": 28, "hobbies": ["摄影", "编程"]}
        ],
        "metadata": {
            "created": "2024-01-01",
            "version": "1.0"
        }
    }
    
    # 写入JSON文件
    with open('data.json', 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    
    # 读取JSON文件
    with open('data.json', 'r', encoding='utf-8') as f:
        loaded_data = json.load(f)
        print("JSON数据:", loaded_data)
    
    # 处理大型JSON流
    def generate_large_json():
        """生成大型JSON数据"""
        for i in range(1000):
            yield json.dumps({"id": i, "data": f"示例数据 {i}"}) + '\n'
    
    # 写入JSON流
    with open('large_data.jsonl', 'w', encoding='utf-8') as f:
        for item in generate_large_json():
            f.write(item)
    
    # 读取JSON流
    with open('large_data.jsonl', 'r', encoding='utf-8') as f:
        for line in f:
            item = json.loads(line.strip())
            if item['id'] % 100 == 0:
                print(f"处理项目: {item}")

json_operations()

六、高级文本处理技术

6.1 正则表达式文本处理

import re

def regex_text_processing():
    """使用正则表达式处理文本"""
    # 示例文本
    text = """
    联系人信息:
    张三: 电话 138-1234-5678, 邮箱 zhangsan@example.com
    李四: 电话 139-8765-4321, 邮箱 lisi@example.com
    王五: 电话 137-5555-6666, 邮箱 wangwu@example.com
    """
    
    # 提取电话号码
    phone_pattern = r'\b\d{3}-\d{4}-\d{4}\b'
    phones = re.findall(phone_pattern, text)
    print("提取的电话号码:", phones)
    
    # 提取邮箱地址
    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    emails = re.findall(email_pattern, text)
    print("提取的邮箱地址:", emails)
    
    # 提取姓名和联系方式
    contact_pattern = r'([\u4e00-\u9fa5]+):\s*电话\s*(\d{3}-\d{4}-\d{4}),\s*邮箱\s*([^\s,]+)'
    contacts = re.findall(contact_pattern, text)
    print("完整联系人信息:")
    for name, phone, email in contacts:
        print(f"姓名: {name}, 电话: {phone}, 邮箱: {email}")
    
    # 使用正则表达式替换
    replaced_text = re.sub(r'\d{3}-\d{4}-\d{4}', '***-****-****', text)
    print("脱敏后的文本:")
    print(replaced_text)

regex_text_processing()

6.2 模板引擎与动态文本生成

from string import Template

def template_processing():
    """使用模板生成文本"""
    # 简单字符串模板
    template = Template("您好,$name!您的订单#$order_id 已发货,预计$delivery_date送达。")
    
    message = template.substitute(
        name="张三",
        order_id="12345",
        delivery_date="2024-01-15"
    )
    print("模板消息:", message)
    
    # 文件模板示例
    with open('template.txt', 'w', encoding='utf-8') as f:
        f.write("""
尊敬的$customer_name:

感谢您购买我们的产品。

订单详情:
- 订单号: $order_id
- 产品: $product_name
- 数量: $quantity
- 总价: ¥$total_price

预计发货时间: $ship_date
如有问题,请联系: $support_email

祝您购物愉快!
$company_name 团队
        """)
    
    # 从文件读取模板
    with open('template.txt', 'r', encoding='utf-8') as f:
        template_content = f.read()
    
    # 填充模板
    email_template = Template(template_content)
    email_content = email_template.substitute(
        customer_name="李四",
        order_id="67890",
        product_name="Python编程书籍",
        quantity=2,
        total_price="199.00",
        ship_date="2024-01-16",
        support_email="support@example.com",
        company_name="卓越图书"
    )
    
    print("生成的邮件内容:")
    print(email_content)
    
    # 批量生成内容
    customers = [
        {"name": "王五", "order_id": "11111", "product": "笔记本电脑", "quantity": 1, "price": "5999.00"},
        {"name": "赵六", "order_id": "22222", "product": "智能手机", "quantity": 1, "price": "3999.00"},
    ]
    
    for customer in customers:
        message = template.substitute(
            name=customer["name"],
            order_id=customer["order_id"],
            delivery_date="2024-01-17"
        )
        print(f"给 {customer['name']} 的消息: {message}")

template_processing()

七、性能优化与最佳实践

7.1 文本处理性能优化

import time
import functools

def timeit(func):
    """计时装饰器"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} 耗时: {end - start:.4f}秒")
        return result
    return wrapper

@timeit
def optimized_text_processing():
    """优化文本处理性能"""
    # 创建测试数据
    with open('perf_test.txt', 'w', encoding='utf-8') as f:
        for i in range(100000):
            f.write(f"这是测试行号 {i},包含一些文本内容用于性能测试\n")
    
    # 方法1: 传统逐行读取
    def method1():
        with open('perf_test.txt', 'r', encoding='utf-8') as f:
            lines = []
            for line in f:
                lines.append(line.strip())
        return lines
    
    # 方法2: 使用列表推导式
    def method2():
        with open('perf_test.txt', 'r', encoding='utf-8') as f:
            return [line.strip() for line in f]
    
    # 方法3: 使用生成器表达式
    def method3():
        with open('perf_test.txt', 'r', encoding='utf-8') as f:
            return (line.strip() for line in f)
    
    # 方法4: 批量处理
    import itertools
    def method4():
        with open('perf_test.txt', 'r', encoding='utf-8') as f:
            while True:
                lines = [line.strip() for line in itertools.islice(f, 1000)]
                if not lines:
                    break
                yield lines
    
    print("性能测试开始:")
    result1 = method1()
    result2 = method2()
    result3 = method3()
    
    line_count = 0
    for batch in method4():
        line_count += len(batch)
    
    print(f"总行数: {len(result1)}, 批量处理行数: {line_count}")

optimized_text_processing()

7.2 内存使用优化

def memory_optimization():
    """文本处理内存优化"""
    # 创建大型文件
    with open('large_memory_test.txt', 'w', encoding='utf-8') as f:
        for i in range(500000):
            f.write(f"行 {i}: 这是一个测试行,包含一些文本内容用于内存优化测试\n")
    
    # 内存密集型方法(不推荐)
    def memory_intensive():
        with open('large_memory_test.txt', 'r', encoding='utf-8') as f:
            lines = f.readlines()  # 一次性读取所有行
            processed = [line.upper() for line in lines]
        return processed
    
    # 内存友好方法(推荐)
    def memory_friendly():
        with open('large_memory_test.txt', 'r', encoding='utf-8') as f:
            for line in f:
                yield line.upper()  # 逐行生成结果
    
    # 测量内存使用
    import tracemalloc
    
    print("内存使用测试:")
    
    tracemalloc.start()
    # 测试内存密集型方法
    result1 = memory_intensive()
    current, peak = tracemalloc.get_traced_memory()
    print(f"内存密集型 - 当前: {current/1024/1024:.2f}MB, 峰值: {peak/1024/1024:.2f}MB")
    tracemalloc.stop()
    
    tracemalloc.start()
    # 测试内存友好方法
    result2 = list(memory_friendly())  # 转换为列表以便比较
    current, peak = tracemalloc.get_traced_memory()
    print(f"内存友好型 - 当前: {current/1024/1024:.2f}MB, 峰值: {peak/1024/1024:.2f}MB")
    tracemalloc.stop()
    
    # 验证结果一致性
    print(f"结果一致性: {result1 == result2}")

memory_optimization()

八、最佳实践总结

8.1 文本处理黄金法则

​1.选择正确的文件模式​​:

2.​​内存管理最佳实践​​:

3.错误处理与健壮性​​:

4.​​性能优化策略​​:

5.​​代码可维护性​​:

8.2 实战建议

def professional_text_processor(input_file, output_file, processing_func):
    """
    专业文本处理器模板
    
    参数:
        input_file: 输入文件路径
        output_file: 输出文件路径
        processing_func: 处理函数,接受一行文本返回处理结果
    """
    try:
        with open(input_file, 'r', encoding='utf-8') as infile, \
             open(output_file, 'w', encoding='utf-8') as outfile:
            
            # 使用生成器表达式进行流式处理
            processed_lines = (processing_func(line) for line in infile)
            
            # 批量写入提高性能
            batch_size = 1000
            batch = []
            
            for processed_line in processed_lines:
                batch.append(processed_line)
                if len(batch) >= batch_size:
                    outfile.writelines(batch)
                    batch = []
            
            # 写入剩余行
            if batch:
                outfile.writelines(batch)
                
    except FileNotFoundError:
        print(f"错误: 文件 {input_file} 不存在")
    except PermissionError:
        print(f"错误: 没有权限访问文件")
    except Exception as e:
        print(f"处理过程中发生错误: {e}")
    
    print("处理完成")

# 使用示例
def example_processor(line):
    """示例处理函数: 转换为大写并添加行号"""
    return f"PROCESSED: {line.upper()}"

professional_text_processor('input.txt', 'output.txt', example_processor)

总结:文本数据处理技术全景

通过本文的全面探讨,我们深入了解了Python文本数据处理的完整技术体系。从基础文件操作到高级编码处理,从大文件优化到结构化数据处理,我们覆盖了文本处理领域的核心知识点。

关键技术要点回顾:

文本处理是Python编程中的基础且重要的技能,掌握这些技术将大大提高您的编程效率和代码质量。无论您是处理小型配置文件还是大型数据流水线,这些技术都能为您提供强大的工具和支持。

记住,优秀的文本处理代码不仅关注功能实现,更注重效率、健壮性和可维护性。始终根据具体需求选择最适合的技术方案,并在性能与复杂度之间找到平衡点。

以上就是Python实现文本数据读写方法的完全指南的详细内容,更多关于Python读写文本数据的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文