python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python csv模块文件读写

Python使用csv模块进行文件读写的操作详解

作者:Python游侠

本文详细介绍了CSV的基本概念、语法和核心特点,通过实际案例讲解了如何使用Python的csv模块进行数据读取、分析、清洗、转换、合并和拆分等操作,同时,还介绍了高级技巧、性能优化方法和实战工具的构建,并提供了常见问题的解决方案及替代方案的选择

一、核心概念解析

1.1 基础定义:CSV到底是什么?

CSV的全称是"Comma-Separated Values"(逗号分隔值),但它有个小秘密:并不总是用逗号

看看这几个常见的CSV变体:

# 典型的CSV格式
name,age,city
张三,30,北京
李四,25,上海

# 用分号分隔(欧洲常见)
name;age;city
张三;30;北京
李四;25;上海

# 用制表符分隔(TSV文件)
name	age	city
张三	30	北京
李四	25	上海

# 带引号的CSV(包含逗号时)
product,price,description
"苹果,红富士",5.8,"新鲜,甜"
香蕉,3.2,"进口,黄色"

CSV的本质是纯文本表格,由三部分组成:

  1. 表头行(可选):定义每列的名称
  2. 数据行:每行一条记录
  3. 分隔符:通常是逗号,但也可能是其他字符

1.2 基本语法:csv模块的四大金刚

csv模块的核心是四个类/函数:

import csv

# 1. 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as file:
    reader = csv.reader(file)
    for row in reader:
        print(row)  # 每行是一个列表

# 2. 写入CSV文件
data = [
    ['name', 'age', 'city'],
    ['张三', '30', '北京'],
    ['李四', '25', '上海']
]

with open('output.csv', 'w', encoding='utf-8', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(data)

# 3. 用字典方式读取(推荐!)
with open('data.csv', 'r', encoding='utf-8') as file:
    reader = csv.DictReader(file)
    for row in reader:
        print(row['name'], row['age'])  # 通过列名访问

# 4. 用字典方式写入
data = [
    {'name': '张三', 'age': '30', 'city': '北京'},
    {'name': '李四', 'age': '25', 'city': '上海'}
]

with open('output_dict.csv', 'w', encoding='utf-8', newline='') as file:
    fieldnames = ['name', 'age', 'city']
    writer = csv.DictWriter(file, fieldnames=fieldnames)
    writer.writeheader()  # 写入表头
    writer.writerows(data)

注意那个newline=''参数:在Windows上,如果不加这个,每行后面会多一个空行。这是Python处理文本文件的一个坑,记住就好。

1.3 核心特点:为什么选择csv模块?

  1. 零依赖:Python自带,不用安装任何东西
  2. 内存友好:流式处理,再大的文件也不怕
  3. 灵活配置:分隔符、引号字符、编码全都可以自定义
  4. 简单易用:几行代码就能完成复杂操作
  5. 兼容性好:处理各种"奇怪"的CSV文件

二、应用场景详解

2.1 读取和分析数据

假设你有一个销售数据文件sales.csv

date,product,quantity,price,region
2023-01-15,Widget-A,10,29.99,North
2023-01-15,Gadget-X,5,99.99,South
2023-01-16,Widget-A,8,29.99,East
2023-01-16,Thingy-B,3,149.99,West

让我们用csv模块来分析它:

# analyze_sales.py
import csv
from collections import defaultdict
from datetime import datetime

def analyze_sales_data(filepath):
    """
    分析销售数据
    """
    # 存储统计结果
    stats = {
        'total_sales': 0,
        'total_quantity': 0,
        'by_product': defaultdict(float),
        'by_region': defaultdict(float),
        'by_date': defaultdict(float)
    }
    
    with open(filepath, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        
        for row in reader:
            try:
                # 转换数据类型
                quantity = int(row['quantity'])
                price = float(row['price'])
                date_str = row['date']
                
                # 计算销售额
                sales = quantity * price
                
                # 更新统计
                stats['total_sales'] += sales
                stats['total_quantity'] += quantity
                stats['by_product'][row['product']] += sales
                stats['by_region'][row['region']] += sales
                stats['by_date'][date_str] += sales
                
            except (ValueError, KeyError) as e:
                print(f"数据格式错误: {row}, 错误: {e}")
                continue
    
    return stats

def print_statistics(stats):
    """
    打印统计结果
    """
    print("=" * 50)
    print("销售数据分析报告")
    print("=" * 50)
    
    print(f"\n总销售额: ${stats['total_sales']:.2f}")
    print(f"总销售数量: {stats['total_quantity']}")
    
    print("\n按产品统计:")
    for product, sales in sorted(stats['by_product'].items(), 
                                 key=lambda x: x[1], reverse=True):
        print(f"  {product}: ${sales:.2f}")
    
    print("\n按地区统计:")
    for region, sales in sorted(stats['by_region'].items(),
                                key=lambda x: x[1], reverse=True):
        print(f"  {region}: ${sales:.2f}")
    
    print("\n按日期统计:")
    for date, sales in sorted(stats['by_date'].items()):
        print(f"  {date}: ${sales:.2f}")

# 生成测试数据
def create_sample_data():
    """创建示例销售数据"""
    data = [
        ['date', 'product', 'quantity', 'price', 'region'],
        ['2023-01-15', 'Widget-A', 10, 29.99, 'North'],
        ['2023-01-15', 'Gadget-X', 5, 99.99, 'South'],
        ['2023-01-16', 'Widget-A', 8, 29.99, 'East'],
        ['2023-01-16', 'Thingy-B', 3, 149.99, 'West'],
        ['2023-01-16', 'Gadget-X', 12, 99.99, 'North'],
        ['2023-01-17', 'Widget-A', 15, 27.50, 'South'],
    ]
    
    with open('sales.csv', 'w', encoding='utf-8', newline='') as file:
        writer = csv.writer(file)
        writer.writerows(data)
    
    print("示例数据已创建: sales.csv")

if __name__ == "__main__":
    # 创建测试数据
    create_sample_data()
    
    # 分析数据
    stats = analyze_sales_data('sales.csv')
    
    # 打印结果
    print_statistics(stats)

2.2 数据清洗和转换

现实中的数据很少是完美的。让我们看看如何处理各种"脏数据":

# clean_data.py
import csv
import re

def clean_csv_file(input_file, output_file):
    """
    清洗CSV数据
    """
    cleaned_rows = []
    
    with open(input_file, 'r', encoding='utf-8') as infile:
        reader = csv.DictReader(infile)
        fieldnames = reader.fieldnames
        
        for i, row in enumerate(reader, 1):
            cleaned_row = {}
            
            for field in fieldnames:
                original_value = row.get(field, '')
                cleaned_value = clean_field(field, original_value)
                cleaned_row[field] = cleaned_value
            
            # 检查是否有必要的数据
            if is_valid_row(cleaned_row):
                cleaned_rows.append(cleaned_row)
            else:
                print(f"跳过第{i}行无效数据: {row}")
    
    # 写入清洗后的数据
    with open(output_file, 'w', encoding='utf-8', newline='') as outfile:
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(cleaned_rows)
    
    print(f"数据清洗完成: {len(cleaned_rows)} 行有效数据")

def clean_field(field_name, value):
    """
    根据字段类型清洗数据
    """
    if not value:
        return value
    
    # 去除首尾空白
    value = value.strip()
    
    # 根据不同字段类型进行清洗
    if 'email' in field_name.lower():
        return clean_email(value)
    elif 'phone' in field_name.lower():
        return clean_phone(value)
    elif 'date' in field_name.lower():
        return clean_date(value)
    elif 'price' in field_name.lower() or 'amount' in field_name.lower():
        return clean_number(value)
    else:
        return value

def clean_email(email):
    """清洗邮箱地址"""
    email = email.lower().strip()
    # 简单的邮箱格式验证
    if '@' in email and '.' in email.split('@')[1]:
        return email
    return ''

def clean_phone(phone):
    """清洗电话号码"""
    # 移除非数字字符
    digits = re.sub(r'\D', '', phone)
    
    if len(digits) == 11:  # 中国手机号
        return digits
    elif len(digits) == 10:  # 固定电话
        return digits
    else:
        return ''

def clean_date(date_str):
    """清洗日期"""
    # 尝试不同的日期格式
    formats = ['%Y-%m-%d', '%Y/%m/%d', '%Y年%m月%d日', '%d/%m/%Y']
    
    for fmt in formats:
        try:
            from datetime import datetime
            dt = datetime.strptime(date_str, fmt)
            return dt.strftime('%Y-%m-%d')  # 统一格式
        except ValueError:
            continue
    
    return date_str  # 无法解析,返回原值

def clean_number(number_str):
    """清洗数字"""
    # 移除货币符号、千分位分隔符等
    cleaned = re.sub(r'[^\d\.-]', '', number_str)
    
    try:
        return str(float(cleaned))
    except ValueError:
        return '0'

def is_valid_row(row):
    """检查行是否有效"""
    # 必须有姓名
    if not row.get('name', '').strip():
        return False
    
    # 邮箱必须有效(如果有的话)
    email = row.get('email', '')
    if email and '@' not in email:
        return False
    
    return True

def create_dirty_data():
    """创建包含脏数据的测试文件"""
    dirty_data = [
        ['name', 'email', 'phone', 'join_date', 'amount'],
        [' 张三 ', 'ZHANGSAN@EXAMPLE.COM', '138-0013-8000', '2023-01-15', '¥1,000.50'],
        ['李四', 'lisi.example.com', '010-62345678', '2023/02/20', '2,500.00'],
        ['', 'wangwu@example.com', '13912345678', '2023年03月15日', '1500'],
        ['赵六', 'zhaoliu@example', '123-4567', '无效日期', 'ABC'],
        ['钱七', 'qianqi@example.com', '15012345678', '20/04/2023', '3,000.00'],
    ]
    
    with open('dirty_data.csv', 'w', encoding='utf-8', newline='') as file:
        writer = csv.writer(file)
        writer.writerows(dirty_data)
    
    print("脏数据测试文件已创建: dirty_data.csv")

if __name__ == "__main__":
    # 创建测试数据
    create_dirty_data()
    
    # 清洗数据
    clean_csv_file('dirty_data.csv', 'cleaned_data.csv')
    
    # 显示清洗结果
    print("\n清洗前后对比:")
    print("-" * 50)
    
    with open('dirty_data.csv', 'r', encoding='utf-8') as f:
        print("原始数据:")
        for line in f:
            print("  ", line.strip())
    
    print("\n清洗后数据:")
    with open('cleaned_data.csv', 'r', encoding='utf-8') as f:
        for line in f:
            print("  ", line.strip())

2.3 合并和拆分文件

工作中经常需要处理多个CSV文件,比如每月的数据分开存储,但分析时需要合并:

# merge_split_csv.py
import csv
import os
from glob import glob

def merge_csv_files(pattern, output_file):
    """
    合并多个CSV文件
    
    参数:
        pattern: 文件匹配模式,如 'data/*.csv'
        output_file: 合并后的输出文件
    """
    all_data = []
    headers = None
    
    for filename in glob(pattern):
        print(f"处理文件: {filename}")
        
        with open(filename, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            # 检查表头是否一致
            if headers is None:
                headers = reader.fieldnames
            elif headers != reader.fieldnames:
                print(f"警告: {filename} 的表头不一致,跳过")
                continue
            
            # 读取数据
            for row in reader:
                all_data.append(row)
    
    if not all_data:
        print("没有找到可合并的数据")
        return
    
    # 写入合并后的文件
    with open(output_file, 'w', encoding='utf-8', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=headers)
        writer.writeheader()
        writer.writerows(all_data)
    
    print(f"合并完成: 共 {len(all_data)} 行数据,保存到 {output_file}")

def split_csv_by_column(input_file, column_name, output_dir):
    """
    按某一列的值拆分CSV文件
    
    参数:
        input_file: 输入文件
        column_name: 按此列拆分
        output_dir: 输出目录
    """
    # 确保输出目录存在
    os.makedirs(output_dir, exist_ok=True)
    
    # 按列值分组数据
    groups = {}
    
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        headers = reader.fieldnames
        
        for row in reader:
            key = row.get(column_name, 'unknown')
            
            if key not in groups:
                groups[key] = []
            
            groups[key].append(row)
    
    # 为每个组创建文件
    for key, rows in groups.items():
        # 创建安全的文件名
        safe_key = "".join(c for c in str(key) if c.isalnum() or c in (' ', '-', '_')).rstrip()
        output_file = os.path.join(output_dir, f"{safe_key}.csv")
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=headers)
            writer.writeheader()
            writer.writerows(rows)
        
        print(f"创建文件: {output_file} ({len(rows)} 行)")
    
    print(f"拆分完成: 共创建 {len(groups)} 个文件")

def create_monthly_data():
    """创建月度测试数据"""
    months = ['2023-01', '2023-02', '2023-03']
    
    for month in months:
        filename = f"monthly_data_{month}.csv"
        data = [
            ['date', 'product', 'sales', 'region'],
            [f'{month}-10', 'Product-A', '1000', 'North'],
            [f'{month}-15', 'Product-B', '1500', 'South'],
            [f'{month}-20', 'Product-A', '800', 'East'],
        ]
        
        with open(filename, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file)
            writer.writerows(data)
        
        print(f"创建: {filename}")

if __name__ == "__main__":
    print("=== CSV文件合并与拆分演示 ===\n")
    
    # 创建测试数据
    print("1. 创建月度测试数据...")
    create_monthly_data()
    
    # 合并文件
    print("\n2. 合并月度数据...")
    merge_csv_files('monthly_data_*.csv', 'merged_quarterly.csv')
    
    # 拆分文件
    print("\n3. 按地区拆分数据...")
    split_csv_by_column('merged_quarterly.csv', 'region', 'split_by_region')
    
    # 显示结果
    print("\n" + "=" * 50)
    print("生成的文件:")
    for file in ['merged_quarterly.csv', 'split_by_region']:
        if os.path.exists(file):
            if os.path.isdir(file):
                print(f"目录: {file}/")
                for f in os.listdir(file):
                    print(f"  - {f}")
            else:
                print(f"文件: {file}")

三、高级技巧

3.1 处理特殊格式的CSV

不是所有的CSV文件都那么"标准"。让我们看看如何处理各种特殊情况:

# special_csv_formats.py
import csv

def read_excel_csv():
    """
    处理从Excel导出的CSV
    Excel导出的CSV常有BOM和特殊编码
    """
    files_to_try = [
        ('utf-8-sig', 'excel_utf8_bom.csv'),  # UTF-8 with BOM
        ('gbk', 'excel_gbk.csv'),             # GBK编码(中文Windows)
        ('utf-8', 'excel_utf8.csv'),          # 普通UTF-8
    ]
    
    for encoding, filename in files_to_try:
        try:
            with open(filename, 'r', encoding=encoding) as file:
                reader = csv.reader(file)
                data = list(reader)
            
            print(f"成功读取 {filename} (编码: {encoding})")
            print(f"  表头: {data[0]}")
            print(f"  行数: {len(data)}")
            break
        except (UnicodeDecodeError, FileNotFoundError):
            continue

def handle_custom_delimiters():
    """处理自定义分隔符"""
    # 用分号分隔的文件
    with open('european_format.csv', 'w', encoding='utf-8') as f:
        f.write("name;age;city\n")
        f.write("张三;30;北京\n")
        f.write("李四;25;上海\n")
    
    # 读取时指定分隔符
    with open('european_format.csv', 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter=';')
        for row in reader:
            print(f"分隔符';': {row}")
    
    # TSV文件(制表符分隔)
    with open('data.tsv', 'w', encoding='utf-8') as f:
        f.write("name\tage\tcity\n")
        f.write("张三\t30\t北京\n")
        f.write("李四\t25\t上海\n")
    
    with open('data.tsv', 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='\t')
        for row in reader:
            print(f"分隔符'\\t': {row}")

def handle_quoted_data():
    """处理带引号的数据"""
    # 创建包含逗号和引号的数据
    tricky_data = [
        ['name', 'description', 'price'],
        ['Apple', 'Red, delicious apple', '5.8'],
        ['Banana', 'Yellow "sweet" banana', '3.2'],
        ['Orange Juice', 'Fresh, 100% pure orange "juice"', '12.5'],
    ]
    
    with open('quoted_data.csv', 'w', encoding='utf-8', newline='') as file:
        writer = csv.writer(file, quoting=csv.QUOTE_ALL)  # 所有字段都加引号
        writer.writerows(tricky_data)
    
    print("\n带引号的数据文件已创建")
    
    # 读取时处理引号
    with open('quoted_data.csv', 'r', encoding='utf-8') as file:
        reader = csv.reader(file)
        for row in reader:
            print(f"原始行: {row}")

def handle_large_files():
    """处理大文件的内存友好方式"""
    print("\n大文件处理策略:")
    print("1. 流式处理,一次一行,不加载到内存")
    print("2. 分批处理,特别是需要排序或聚合时")
    print("3. 使用生成器避免内存累积")
    
    def process_large_file(filename, chunk_size=1000):
        """分批处理大文件"""
        with open(filename, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            chunk = []
            for i, row in enumerate(reader, 1):
                chunk.append(row)
                
                if len(chunk) >= chunk_size:
                    yield chunk
                    chunk = []
            
            if chunk:  # 最后一批
                yield chunk
    
    # 模拟处理
    print("\n模拟处理大文件(分块):")
    for chunk_num, chunk in enumerate(process_large_file('large_data.csv', chunk_size=2), 1):
        print(f"  处理第{chunk_num}块: {len(chunk)} 行")
        # 这里可以处理每个数据块
        # 比如写入数据库、进行聚合计算等

if __name__ == "__main__":
    print("=== 特殊CSV格式处理 ===\n")
    
    # 处理自定义分隔符
    print("1. 自定义分隔符处理:")
    handle_custom_delimiters()
    
    # 处理带引号的数据
    print("\n2. 带引号数据处理:")
    handle_quoted_data()
    
    # 大文件处理
    print("\n3. 大文件处理策略:")
    handle_large_files()

3.2 性能优化技巧

# csv_performance.py
import csv
import time
import random
from memory_profiler import profile

def create_large_csv(filename, num_rows=100000):
    """创建大型测试CSV文件"""
    print(f"创建大型测试文件: {filename} ({num_rows} 行)")
    
    with open(filename, 'w', encoding='utf-8', newline='') as file:
        writer = csv.writer(file)
        
        # 写入表头
        writer.writerow(['id', 'name', 'value', 'category', 'timestamp'])
        
        # 写入数据
        for i in range(num_rows):
            writer.writerow([
                i + 1,
                f'Item-{random.randint(1, 1000)}',
                random.uniform(1, 1000),
                random.choice(['A', 'B', 'C', 'D']),
                f'2023-{random.randint(1,12):02d}-{random.randint(1,28):02d}'
            ])
    
    print("文件创建完成")

def naive_processing(filename):
    """原始方法:一次性加载所有数据"""
    print("\n方法1: 一次性加载所有数据")
    start = time.time()
    
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        all_data = list(reader)  # 这里会加载所有数据到内存
    
    # 处理数据
    total = 0
    for row in all_data:
        total += float(row['value'])
    
    elapsed = time.time() - start
    print(f"  处理了 {len(all_data)} 行数据")
    print(f"  总和: {total:.2f}")
    print(f"  耗时: {elapsed:.2f} 秒")
    
    return elapsed

def stream_processing(filename):
    """流式处理:一次处理一行"""
    print("\n方法2: 流式处理(一次一行)")
    start = time.time()
    
    total = 0
    count = 0
    
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        
        for row in reader:
            total += float(row['value'])
            count += 1
    
    elapsed = time.time() - start
    print(f"  处理了 {count} 行数据")
    print(f"  总和: {total:.2f}")
    print(f"  耗时: {elapsed:.2f} 秒")
    
    return elapsed

def batch_processing(filename, batch_size=10000):
    """批量处理:分批处理数据"""
    print(f"\n方法3: 批量处理(每批 {batch_size} 行)")
    start = time.time()
    
    total = 0
    count = 0
    batch = []
    
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        
        for row in reader:
            batch.append(float(row['value']))
            count += 1
            
            if len(batch) >= batch_size:
                # 处理一个批次
                total += sum(batch)
                batch = []  # 清空批次
        
        # 处理最后一批
        if batch:
            total += sum(batch)
    
    elapsed = time.time() - start
    print(f"  处理了 {count} 行数据")
    print(f"  总和: {total:.2f}")
    print(f"  耗时: {elapsed:.2f} 秒")
    
    return elapsed

def optimized_writing(filename, num_rows=10000):
    """优化写入性能"""
    print(f"\n写入性能优化 ({num_rows} 行):")
    
    # 方法1: 逐行写入
    start = time.time()
    with open('slow_write.csv', 'w', encoding='utf-8', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['id', 'value'])
        
        for i in range(num_rows):
            writer.writerow([i, i * 1.5])  # 每次调用writerow
    
    elapsed1 = time.time() - start
    print(f"  逐行写入: {elapsed1:.2f} 秒")
    
    # 方法2: 批量写入
    start = time.time()
    with open('fast_write.csv', 'w', encoding='utf-8', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['id', 'value'])
        
        # 准备所有数据
        all_data = [[i, i * 1.5] for i in range(num_rows)]
        writer.writerows(all_data)  # 一次写入所有行
    
    elapsed2 = time.time() - start
    print(f"  批量写入: {elapsed2:.2f} 秒")
    print(f"  速度提升: {elapsed1/elapsed2:.1f} 倍")

if __name__ == "__main__":
    print("=== CSV性能优化技巧 ===\n")
    
    # 创建测试数据
    test_file = 'large_test_data.csv'
    create_large_csv(test_file, num_rows=50000)
    
    # 测试不同处理方法的性能
    times = []
    times.append(('一次性加载', naive_processing(test_file)))
    times.append(('流式处理', stream_processing(test_file)))
    times.append(('批量处理', batch_processing(test_file)))
    
    # 写入性能测试
    optimized_writing('test_write.csv', num_rows=10000)
    
    # 总结
    print("\n" + "=" * 50)
    print("性能总结:")
    for method, t in times:
        print(f"  {method}: {t:.2f} 秒")

3.3 实战:构建CSV处理工具

让我们构建一个实用的CSV处理工具,集成各种常用功能:

# csv_toolkit.py
"""
CSV处理工具箱

一个实用的命令行工具,集成各种CSV处理功能
"""
import csv
import argparse
import sys
import os
from pathlib import Path

class CSVToolkit:
    """CSV处理工具箱"""
    
    def __init__(self):
        self.commands = {
            'view': self.view_csv,
            'head': self.head_csv,
            'tail': self.tail_csv,
            'stats': self.csv_stats,
            'filter': self.filter_csv,
            'select': self.select_columns,
            'sample': self.sample_csv,
        }
    
    def run(self):
        """运行工具箱"""
        parser = argparse.ArgumentParser(description='CSV处理工具箱')
        parser.add_argument('command', choices=self.commands.keys(), 
                          help='要执行的操作')
        parser.add_argument('file', help='CSV文件路径')
        parser.add_argument('-o', '--output', help='输出文件路径')
        parser.add_argument('-n', '--lines', type=int, default=10,
                          help='显示的行数(用于head/tail)')
        parser.add_argument('-c', '--columns', help='选择的列,用逗号分隔')
        parser.add_argument('-f', '--filter', help='过滤条件,格式: 列名=值')
        
        args = parser.parse_args()
        
        # 检查文件是否存在
        if not os.path.exists(args.file):
            print(f"错误: 文件 '{args.file}' 不存在")
            sys.exit(1)
        
        # 执行命令
        self.commands[args.command](args)
    
    def view_csv(self, args):
        """查看CSV文件内容"""
        with open(args.file, 'r', encoding='utf-8') as file:
            reader = csv.reader(file)
            
            for i, row in enumerate(reader):
                # 美化输出
                formatted = ' | '.join(f'{cell:<20}' for cell in row)
                print(formatted)
                
                # 显示表头分隔线
                if i == 0:
                    print('-' * len(formatted))
    
    def head_csv(self, args):
        """显示文件前n行"""
        with open(args.file, 'r', encoding='utf-8') as file:
            reader = csv.reader(file)
            
            for i, row in enumerate(reader):
                if i >= args.lines + 1:  # +1 为了包含表头
                    break
                print(', '.join(row))
    
    def tail_csv(self, args):
        """显示文件后n行"""
        with open(args.file, 'r', encoding='utf-8') as file:
            lines = file.readlines()
            
        # 显示最后n行
        for line in lines[-args.lines:]:
            print(line.rstrip())
    
    def csv_stats(self, args):
        """显示CSV文件统计信息"""
        with open(args.file, 'r', encoding='utf-8') as file:
            reader = csv.reader(file)
            data = list(reader)
        
        if not data:
            print("文件为空")
            return
        
        headers = data[0]
        rows = data[1:]
        
        print(f"文件: {args.file}")
        print(f"行数: {len(rows)} 行(不含表头)")
        print(f"列数: {len(headers)} 列")
        print(f"列名: {', '.join(headers)}")
        
        # 每列的数据类型推测
        if rows:
            print("\n列信息:")
            for i, header in enumerate(headers):
                sample = rows[0][i] if len(rows[0]) > i else ''
                print(f"  {header}: 示例='{sample}'")
    
    def filter_csv(self, args):
        """过滤CSV文件"""
        if not args.filter:
            print("错误: 需要指定过滤条件(-f 列名=值)")
            return
        
        # 解析过滤条件
        col_name, value = args.filter.split('=', 1)
        
        with open(args.file, 'r', encoding='utf-8') as infile:
            reader = csv.DictReader(infile)
            headers = reader.fieldnames
            
            # 收集匹配的行
            matching_rows = []
            for row in reader:
                if row.get(col_name) == value:
                    matching_rows.append(row)
        
        # 输出结果
        if args.output:
            with open(args.output, 'w', encoding='utf-8', newline='') as outfile:
                writer = csv.DictWriter(outfile, fieldnames=headers)
                writer.writeheader()
                writer.writerows(matching_rows)
            print(f"过滤结果已保存到: {args.output} ({len(matching_rows)} 行)")
        else:
            # 打印到控制台
            writer = csv.DictWriter(sys.stdout, fieldnames=headers)
            writer.writeheader()
            writer.writerows(matching_rows)
    
    def select_columns(self, args):
        """选择特定列"""
        if not args.columns:
            print("错误: 需要指定要选择的列(-c 列1,列2,...)")
            return
        
        selected = [col.strip() for col in args.columns.split(',')]
        
        with open(args.file, 'r', encoding='utf-8') as infile:
            reader = csv.DictReader(infile)
            
            # 检查列是否存在
            for col in selected:
                if col not in reader.fieldnames:
                    print(f"警告: 列 '{col}' 不存在")
            
            # 只保留选中的列
            filtered_rows = []
            for row in reader:
                filtered_row = {col: row[col] for col in selected if col in row}
                filtered_rows.append(filtered_row)
        
        # 输出结果
        if args.output:
            with open(args.output, 'w', encoding='utf-8', newline='') as outfile:
                writer = csv.DictWriter(outfile, fieldnames=selected)
                writer.writeheader()
                writer.writerows(filtered_rows)
            print(f"结果已保存到: {args.output}")
        else:
            writer = csv.DictWriter(sys.stdout, fieldnames=selected)
            writer.writeheader()
            writer.writerows(filtered_rows)
    
    def sample_csv(self, args):
        """随机抽样"""
        import random
        
        with open(args.file, 'r', encoding='utf-8') as file:
            reader = csv.reader(file)
            data = list(reader)
        
        if len(data) <= 1:
            print("文件数据不足")
            return
        
        headers = data[0]
        rows = data[1:]
        
        # 随机抽样
        sample_size = min(args.lines, len(rows))
        sampled_rows = random.sample(rows, sample_size)
        
        # 输出结果
        output_data = [headers] + sampled_rows
        
        if args.output:
            with open(args.output, 'w', encoding='utf-8', newline='') as file:
                writer = csv.writer(file)
                writer.writerows(output_data)
            print(f"抽样结果已保存到: {args.output} ({sample_size} 行)")
        else:
            for row in output_data:
                print(', '.join(row))

def create_example_csv():
    """创建示例CSV文件"""
    data = [
        ['id', 'name', 'age', 'city', 'salary'],
        ['1', '张三', '30', '北京', '50000'],
        ['2', '李四', '25', '上海', '45000'],
        ['3', '王五', '35', '广州', '60000'],
        ['4', '赵六', '28', '深圳', '55000'],
        ['5', '钱七', '32', '杭州', '52000'],
        ['6', '孙八', '29', '南京', '48000'],
        ['7', '周九', '31', '成都', '51000'],
        ['8', '吴十', '27', '武汉', '47000'],
    ]
    
    with open('employees.csv', 'w', encoding='utf-8', newline='') as file:
        writer = csv.writer(file)
        writer.writerows(data)
    
    print("示例文件已创建: employees.csv")

def demo_toolkit():
    """演示工具箱功能"""
    print("=== CSV工具箱功能演示 ===\n")
    
    # 创建示例文件
    create_example_csv()
    
    print("\n1. 查看文件内容:")
    print("-" * 50)
    toolkit = CSVToolkit()
    
    # 模拟命令行参数
    class Args:
        pass
    
    args = Args()
    args.file = 'employees.csv'
    args.lines = 5
    args.columns = 'name,city,salary'
    args.filter = 'city=北京'
    args.output = None
    
    print("\n2. 显示前3行:")
    print("-" * 50)
    args.lines = 3
    toolkit.head_csv(args)
    
    print("\n3. 文件统计:")
    print("-" * 50)
    toolkit.csv_stats(args)
    
    print("\n4. 选择特定列:")
    print("-" * 50)
    toolkit.select_columns(args)
    
    print("\n5. 过滤数据:")
    print("-" * 50)
    toolkit.filter_csv(args)
    
    print("\n6. 随机抽样:")
    print("-" * 50)
    args.lines = 2
    toolkit.sample_csv(args)

if __name__ == "__main__":
    if len(sys.argv) > 1:
        # 命令行模式
        toolkit = CSVToolkit()
        toolkit.run()
    else:
        # 演示模式
        demo_toolkit()
        
        print("\n" + "=" * 50)
        print("使用说明:")
        print("  命令行使用: python csv_toolkit.py <命令> <文件> [选项]")
        print("\n可用命令:")
        for cmd in CSVToolkit().commands:
            print(f"  {cmd}")

四、注意事项

4.1 使用限制

  1. 不适合嵌套数据:CSV是扁平结构,不适合存储复杂嵌套数据
  2. 类型信息丢失:所有值都是字符串,需要手动转换类型
  3. 无模式验证:需要自己验证数据完整性和一致性
  4. 性能限制:对于数GB的文件,建议使用数据库或专业工具
  5. 并发访问:CSV文件不适合多进程同时读写

4.2 常见问题

Q: 读取CSV时遇到编码错误怎么办?

A: 按顺序尝试这些编码:utf-8-siggbkutf-8latin-1

encodings = ['utf-8-sig', 'gbk', 'utf-8', 'latin-1']
for enc in encodings:
    try:
        with open('file.csv', 'r', encoding=enc) as f:
            content = f.read()
        print(f"使用编码: {enc}")
        break
    except UnicodeDecodeError:
        continue

Q: 数据中有逗号,导致列错位怎么办?

A: 使用csv.reader而不是手动分割,它会正确处理引号内的逗号。

Q: 文件太大,内存不够怎么办?

A: 使用流式处理,一次处理一行:

with open('large.csv', 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:  # 一次只加载一行到内存
        process(row)

Q: 如何跳过CSV文件中的空行?

A: csv.reader会自动跳过完全空白的行,但对于只有空格的行,需要在代码中处理。

Q: 处理CSV时性能很慢怎么办?

A: 1. 使用csv.writerows()批量写入

  1. 考虑使用pandas处理复杂操作
  2. 对于超大数据,考虑使用数据库

4.3 替代方案

  1. pandas:适合复杂的数据分析和处理
  2. 数据库:适合大量数据和复杂查询
  3. Excel/Google Sheets:适合需要手动查看和编辑的场景
  4. JSON/YAML:适合嵌套和结构化数据
  5. Parquet/Feather:适合大数据和机器学习场景

何时选择替代方案

五、总结

通过本文的学习,你应该已经掌握了:

csv模块的核心价值

  1. 简单直接:处理表格数据最直接的方式
  2. 通用性好:几乎所有系统都支持CSV导入导出
  3. 零成本:Python自带,无需额外依赖
  4. 透明可控:纯文本格式,出现问题容易排查
  5. 生态丰富:与其他工具链完美集成

以上就是Python使用csv模块进行文件读写的操作详解的详细内容,更多关于Python csv模块文件读写的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文