Pandas高效读取CSV、Excel和SQL数据库的数据
作者:小庄-Python办公
本节学习目标
完成本节学习后,你将能够:
- 使用 pandas 从 CSV、TSV 等文本文件读取数据
- 读取 Excel 文件并处理多工作表
- 解决常见的编码问题和数据格式异常
- 使用分块读取处理超过内存容量的大文件
- 连接 SQLite 和 MySQL 数据库并执行查询
- 掌握数据读取的性能优化技巧
为什么学这个
在前面的课程中,我们用 NumPy 生成了各种模拟数据。但现实世界中,数据不会自动出现在你的代码里——它们散落在各处:
- 业务系统导出的 CSV 文件
- 同事发来的 Excel 表格
- 公司数据库中的 SQL 表
- 网页上的 JSON 数据
- 传感器记录的 日志文件
数据分析的第一步,就是把这些分散的数据"搬"到你的分析环境中。如果这一步做不好,后面的一切分析都是空中楼阁。
比喻:如果把数据分析比作做菜,那么数据载入就是"采购食材"。食材买错了(格式不对)、买少了(数据不全)、或者买太多搬不动(内存溢出),这道菜就做不成了。
好消息是,pandas 提供了统一且强大的数据读取接口。无论你面对什么格式的数据,几乎都能用 pd.read_xxx() 一行代码搞定。但实际情况往往没那么简单——编码错误、格式异常、文件过大……这些问题才是真正考验你功力的地方。
核心知识点讲解
pandas 简介——数据分析的"瑞士军刀"
pandas 是基于 NumPy 构建的数据分析库,它的名字来源于 Panel Data(面板数据)和 Python Data Science。
pandas 的两个核心数据结构:
- Series:一维带标签的数组(可以理解为"增强版的一维 NumPy 数组")
- DataFrame:二维表格(可以理解为"增强版的 Excel 表格")
import pandas as pd
import numpy as np
# Series 示例
s = pd.Series([85, 92, 78, 96],
index=['张三', '李四', '王五', '赵六'],
name='数学成绩')
print("Series:")
print(s)
# DataFrame 示例
df = pd.DataFrame({
'姓名': ['张三', '李四', '王五', '赵六'],
'数学': [85, 92, 78, 96],
'英语': [88, 85, 92, 79],
'语文': [76, 80, 85, 88]
})
print("\nDataFrame:")
print(df)
读取 CSV 文件——最常见数据格式
基本读取
CSV(Comma-Separated Values,逗号分隔值)是最常见的数据交换格式。
import pandas as pd
# 首先,我们创建一个 CSV 文件用于演示
data = {
'订单ID': range(1, 11),
'商品': ['手机', '电脑', '平板', '耳机', '键盘',
'鼠标', '显示器', '音箱', '充电器', '数据线'],
'价格': [2999, 5999, 1999, 199, 399,
99, 1599, 499, 79, 29],
'数量': [2, 1, 3, 5, 2,
4, 1, 2, 10, 15],
'日期': pd.date_range('2024-01-01', periods=10)
}
df = pd.DataFrame(data)
df.to_csv('sample_orders.csv', index=False, encoding='utf-8-sig')
print("示例 CSV 文件已创建")
# 读取 CSV 文件
df_read = pd.read_csv('sample_orders.csv')
print("\n读取的 DataFrame:")
print(df_read)
print(f"\n数据形状: {df_read.shape}")
print(f"数据类型:\n{df_read.dtypes}")
read_csv 常用参数详解
# 演示各种参数的用法
# 假设我们有一个稍微复杂一点的 CSV 文件
complex_csv = """ID,Name,Score,Grade,Comment
001,张三,85.5,A,
002,李四,92.0,B,优秀
003,王五,,C,需要努力
004,赵六,78.5,A,进步很大
005,,88.0,B,继续加油"""
# 先写入文件
with open('complex_data.csv', 'w', encoding='utf-8') as f:
f.write(complex_csv)
# 1. 基本读取
print("1. 基本读取:")
print(pd.read_csv('complex_data.csv'))
# 2. 指定列名
print("\n2. 指定列名:")
print(pd.read_csv('complex_data.csv',
names=['编号', '姓名', '分数', '等级', '评语'],
header=0))
# 3. 只读取指定列
print("\n3. 只读姓名列和分数列:")
print(pd.read_csv('complex_data.csv', usecols=['Name', 'Score']))
# 4. 处理缺失值
print("\n4. 默认缺失值处理:")
df = pd.read_csv('complex_data.csv')
print(df.isna().sum()) # 统计每列缺失值数量
# 5. 指定缺失值标记
print("\n5. 自定义缺失值标记:")
df = pd.read_csv('complex_data.csv', na_values=['', ' '])
print(df.isna().sum())
# 6. 解析日期列
df_with_date = pd.read_csv('sample_orders.csv', parse_dates=['日期'])
print("\n6. 日期列解析:")
print(df_with_date['日期'].dtype) # 应该是 datetime64
# 7. 指定数据类型
print("\n7. 指定数据类型:")
df_typed = pd.read_csv('sample_orders.csv',
dtype={'订单ID': str, '商品': 'category'})
print(df_typed.dtypes)
# 8. 跳过行
print("\n8. 跳过前2行:")
print(pd.read_csv('complex_data.csv', skiprows=2))
# 9. 读取指定行数(用于预览大文件)
print("\n9. 只读前3行:")
print(pd.read_csv('sample_orders.csv', nrows=3))
# 10. 设置索引列
print("\n10. 设置索引列为订单ID:")
df_idx = pd.read_csv('sample_orders.csv', index_col='订单ID')
print(df_idx)
TSV 和其他分隔符
# TSV(制表符分隔)
tsv_data = "ID\tName\tScore\n1\t张三\t85\n2\t李四\t92"
with open('data.tsv', 'w', encoding='utf-8') as f:
f.write(tsv_data)
print("TSV 文件读取:")
df_tsv = pd.read_csv('data.tsv', sep='\t') # 或直接用 pd.read_table()
print(df_tsv)
# 其他分隔符示例
pipe_data = "ID|Name|Score\n1|张三|85\n2|李四|92"
with open('data_pipe.txt', 'w', encoding='utf-8') as f:
f.write(pipe_data)
print("\n管道符分隔读取:")
df_pipe = pd.read_csv('data_pipe.txt', sep='|')
print(df_pipe)
读取 Excel 文件——职场中最常见的格式
基本读取
import pandas as pd
# 创建示例 Excel 文件
df1 = pd.DataFrame({
'姓名': ['张三', '李四', '王五'],
'部门': ['技术部', '市场部', '财务部'],
'工资': [15000, 12000, 13000]
})
df2 = pd.DataFrame({
'姓名': ['赵六', '钱七'],
'部门': ['技术部', '人事部'],
'工资': [16000, 11000]
})
# 写入 Excel(需要 openpyxl 库)
with pd.ExcelWriter('sample_data.xlsx', engine='openpyxl') as writer:
df1.to_excel(writer, sheet_name='员工信息', index=False)
df2.to_excel(writer, sheet_name='新员工', index=False)
print("Excel 文件已创建")
# 读取 Excel
# 注意:需要安装 openpyxl: pip install openpyxl
print("\n读取第一个工作表:")
df_excel = pd.read_excel('sample_data.xlsx', sheet_name='员工信息')
print(df_excel)
# 查看所有工作表名
print("\n所有工作表:")
xls = pd.ExcelFile('sample_data.xlsx')
print(xls.sheet_names)
# 读取所有工作表
print("\n读取所有工作表:")
all_sheets = pd.read_excel('sample_data.xlsx', sheet_name=None)
for name, df in all_sheets.items():
print(f"\n--- {name} ---")
print(df)
Excel 读取的实用技巧
# 1. 跳过表头行
# 有些 Excel 文件前两行是标题和说明
with pd.ExcelWriter('demo_skip.xlsx', engine='openpyxl') as writer:
pd.DataFrame({'A': [1, 2, 3]}).to_excel(writer, sheet_name='data')
# 2. 指定列的数据类型
df = pd.read_excel('sample_data.xlsx', sheet_name='员工信息',
dtype={'工资': float})
# 3. 读取指定范围(需要引擎支持)
# df = pd.read_excel('file.xlsx', usecols='A:C', nrows=100)
# 4. 处理 Excel 中的空行
df = pd.read_excel('sample_data.xlsx', sheet_name='员工信息',
skip_blank_lines=True)
重要提示:读取 Excel 文件需要安装 openpyxl(用于 .xlsx)或 xlrd(用于旧版 .xls)库。安装方法:pip install openpyxl。
编码问题处理——数据载入的"头号杀手"
什么是编码问题?
当你尝试读取一个 CSV 文件时,可能会看到这样的错误:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd0 in position 0
这是因为文件的编码方式与读取时指定的编码不一致。
常见编码方式
| 编码 | 说明 | 适用场景 |
|---|---|---|
| UTF-8 | 国际通用编码,支持所有语言 | 现代系统默认 |
| GBK / GB2312 | 中文编码 | Windows 中文系统 |
| GB18030 | 扩展中文编码 | 生僻字支持 |
| Latin-1 | 西欧编码 | 欧洲语言 |
| ASCII | 基础编码 | 纯英文 |
如何判断文件编码
# 方法1:使用 chardet 库自动检测
# pip install chardet
import chardet
with open('sample_orders.csv', 'rb') as f:
raw_data = f.read(10000) # 读取前10000字节
result = chardet.detect(raw_data)
print(f"检测到的编码: {result['encoding']}")
print(f"置信度: {result['confidence']:.2%}")
# 方法2:用 pandas 的 errors 参数处理
# ignore:忽略无法解码的字符
# replace:用 ? 替换无法解码的字符
# surrogateescape:特殊处理(Python 3.1+)
# 方法3:尝试多种编码
def read_with_encoding(filepath, encodings=['utf-8', 'gbk', 'gb18030', 'latin-1']):
for enc in encodings:
try:
df = pd.read_csv(filepath, encoding=enc)
print(f"成功使用 {enc} 编码读取")
return df
except UnicodeDecodeError:
continue
raise ValueError("无法用任何编码读取文件")
df = read_with_encoding('sample_orders.csv')
实际解决方案
# Windows 系统导出的 CSV 通常使用 GBK 编码
# 解决方案1:指定编码
df = pd.read_csv('data.csv', encoding='gbk')
# 解决方案2:使用 utf-8-sig(处理带 BOM 的 UTF-8)
# BOM (Byte Order Mark) 是 UTF-8 文件开头的特殊标记
df = pd.read_csv('data.csv', encoding='utf-8-sig')
# 解决方案3:用 Notepad++ 或 VS Code 转换文件编码
# 打开文件 -> 转换为 UTF-8 -> 保存
最佳实践:在团队协作中,统一规定所有数据文件使用 UTF-8 编码,可以省去大量麻烦。
大数据分块读取——处理超内存文件
什么时候需要分块读取?
当你的 CSV 文件有 10GB,而你的电脑只有 8GB 内存时,直接 pd.read_csv() 会失败(MemoryError)。
分块读取的策略是:每次只读取一小部分,处理完后释放内存,再读取下一部分。
import pandas as pd
import numpy as np
# 先创建一个大文件用于演示(模拟 100 万行数据)
np.random.seed(42)
n_rows = 1_000_000
large_df = pd.DataFrame({
'id': range(n_rows),
'value1': np.random.normal(100, 20, n_rows),
'value2': np.random.normal(50, 10, n_rows),
'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
'date': pd.date_range('2020-01-01', periods=n_rows, freq='min')
})
large_df.to_csv('large_data.csv', index=False)
print(f"已创建包含 {n_rows} 行数据的文件")
方式1:使用 chunksize 参数
# 每次读取 10 万行
chunk_size = 100_000
total_rows = 0
total_value1 = 0
total_value2 = 0
for chunk in pd.read_csv('large_data.csv', chunksize=chunk_size):
total_rows += len(chunk)
total_value1 += chunk['value1'].sum()
total_value2 += chunk['value2'].sum()
print(f"已处理 {total_rows} 行...")
print(f"\n总行数: {total_rows}")
print(f"value1 总和: {total_value1:,.2f}")
print(f"value2 总和: {total_value2:,.2f}")
方式2:分块聚合(计算统计量)
# 计算大文件的整体统计信息
chunk_size = 100_000
stats = {
'count': 0,
'sum_v1': 0, 'sum_v2': 0,
'sum_sq_v1': 0, 'sum_sq_v2': 0,
'min_v1': float('inf'), 'max_v1': float('-inf'),
'min_v2': float('inf'), 'max_v2': float('-inf')
}
for chunk in pd.read_csv('large_data.csv',
usecols=['value1', 'value2'], # 只读需要的列
chunksize=chunk_size):
stats['count'] += len(chunk)
stats['sum_v1'] += chunk['value1'].sum()
stats['sum_v2'] += chunk['value2'].sum()
stats['sum_sq_v1'] += (chunk['value1'] ** 2).sum()
stats['sum_sq_v2'] += (chunk['value2'] ** 2).sum()
stats['min_v1'] = min(stats['min_v1'], chunk['value1'].min())
stats['max_v1'] = max(stats['max_v1'], chunk['value1'].max())
stats['min_v2'] = min(stats['min_v2'], chunk['value2'].min())
stats['max_v2'] = max(stats['max_v2'], chunk['value2'].max())
# 计算均值和标准差
mean_v1 = stats['sum_v1'] / stats['count']
mean_v2 = stats['sum_v2'] / stats['count']
std_v1 = (stats['sum_sq_v1'] / stats['count'] - mean_v1 ** 2) ** 0.5
std_v2 = (stats['sum_sq_v2'] / stats['count'] - mean_v2 ** 2) ** 0.5
print("大文件统计信息:")
print(f" value1: 均值={mean_v1:.2f}, 标准差={std_v1:.2f}, "
f"范围=[{stats['min_v1']:.2f}, {stats['max_v1']:.2f}]")
print(f" value2: 均值={mean_v2:.2f}, 标准差={std_v2:.2f}, "
f"范围=[{stats['min_v2']:.2f}, {stats['max_v2']:.2f}]")
方式3:迭代器模式(更灵活的控制)
# 使用 iterator 参数,手动控制迭代
reader = pd.read_csv('large_data.csv', chunksize=50000, iterator=True)
# 可以跳过某些 chunk
for i, chunk in enumerate(reader):
if i == 0:
print(f"第 {i} 块,形状: {chunk.shape}")
# 保存第一块作为参考
sample = chunk.head()
# 只处理 category 为 A 的数据
filtered = chunk[chunk['category'] == 'A']
if len(filtered) > 0:
pass # 处理过滤后的数据
分块读取的注意事项
- 只读需要的列:使用
usecols参数,减少内存占用 - 指定数据类型:使用
dtype参数,避免 pandas 自动推断导致的内存浪费 - 跳过不需要的行:使用
skiprows参数
# 优化版分块读取
optimized_chunks = pd.read_csv(
'large_data.csv',
chunksize=100_000,
usecols=['id', 'value1', 'category'], # 只读3列
dtype={'id': 'int32', 'value1': 'float32', 'category': 'category'} # 节省内存
)
for chunk in optimized_chunks:
# 处理每个块
pass
连接数据库——从 SQL 读取数据
SQLite——内置数据库(无需安装服务器)
import pandas as pd
import sqlite3
# 创建 SQLite 数据库
conn = sqlite3.connect('sample_database.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY,
name TEXT,
department TEXT,
salary REAL,
hire_date TEXT
)
''')
# 插入数据
employees = [
(1, '张三', '技术部', 15000, '2020-03-15'),
(2, '李四', '市场部', 12000, '2021-06-01'),
(3, '王五', '财务部', 13000, '2019-11-20'),
(4, '赵六', '技术部', 18000, '2018-08-10'),
(5, '钱七', '人事部', 11000, '2022-01-05'),
]
cursor.executemany('INSERT OR REPLACE INTO employees VALUES (?, ?, ?, ?, ?)',
employees)
conn.commit()
# 方式1:直接读取整个表
print("1. 读取整个表:")
df_sqlite = pd.read_sql('SELECT * FROM employees', conn)
print(df_sqlite)
# 方式2:带条件的查询
print("\n2. 查询技术部员工:")
df_tech = pd.read_sql(
"SELECT name, salary FROM employees WHERE department = '技术部'",
conn
)
print(df_tech)
# 方式3:使用 read_sql_query(等价于 read_sql)
print("\n3. 平均工资查询:")
df_avg = pd.read_sql_query(
"SELECT department, AVG(salary) as avg_salary, COUNT(*) as count "
"FROM employees GROUP BY department",
conn
)
print(df_avg)
# 方式4:使用 read_sql_table(读取整张表,SQLite 不支持)
# df = pd.read_sql_table('employees', conn) # 需要 SQLAlchemy
# 方式5:DataFrame 直接写入数据库
df_new = pd.DataFrame({
'id': [6, 7],
'name': ['孙八', '周九'],
'department': ['技术部', '市场部'],
'salary': [16000, 13500],
'hire_date': ['2023-05-10', '2023-08-20']
})
df_new.to_sql('employees', conn, if_exists='append', index=False)
print("\n4. 新增数据后:")
print(pd.read_sql('SELECT * FROM employees', conn))
# 关闭连接
conn.close()
MySQL 连接(需要安装 pymysql)
# 注意:以下代码需要 MySQL 服务器运行
# 安装: pip install pymysql
# 连接 MySQL
# conn_mysql = create_engine('mysql+pymysql://username:password@localhost:3306/database_name')
# 读取数据
# df = pd.read_sql('SELECT * FROM table_name', conn_mysql)
# 示例代码(注释状态,实际运行需要 MySQL)
"""
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:password@localhost:3306/mydb')
# 读取数据
df = pd.read_sql('SELECT * FROM users', engine)
# 写入数据
df.to_sql('users_backup', engine, if_exists='replace', index=False)
# 复杂查询
df = pd.read_sql('''
SELECT d.department_name,
AVG(e.salary) as avg_salary,
COUNT(e.id) as emp_count
FROM employees e
JOIN departments d ON e.dept_id = d.id
GROUP BY d.department_name
ORDER BY avg_salary DESC
''', engine)
"""
提示:如果你使用 PostgreSQL,安装 psycopg2 包,连接字符串改为 postgresql+psycopg2://...。
其他数据格式读取
JSON
import json
# 创建示例 JSON
json_data = [
{"name": "张三", "age": 25, "city": "北京"},
{"name": "李四", "age": 30, "city": "上海"},
{"name": "王五", "age": 28, "city": "广州"}
]
with open('sample.json', 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False)
# 读取 JSON
df_json = pd.read_json('sample.json')
print("JSON 读取:")
print(df_json)
# 复杂 JSON(嵌套结构)
complex_json = '''
{
"employees": {
"张三": {"age": 25, "dept": "技术部"},
"李四": {"age": 30, "dept": "市场部"}
}
}
'''
with open('complex.json', 'w', encoding='utf-8') as f:
f.write(complex_json)
# orient 参数处理不同 JSON 格式
df_complex = pd.read_json('complex.json', orient='index')
print("\n复杂 JSON 读取:")
print(df_complex)
HTML 表格
# 读取网页中的表格
# url = 'https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)'
# tables = pd.read_html(url)
# print(f"页面中找到 {len(tables)} 个表格")
# print(tables[0].head()) # 第一个表格
# 这里用本地 HTML 演示
html_table = """
<table>
<tr><th>姓名</th><th>成绩</th></tr>
<tr><td>张三</td><td>85</td></tr>
<tr><td>李四</td><td>92</td></tr>
<tr><td>王五</td><td>78</td></tr>
</table>
"""
with open('sample.html', 'w', encoding='utf-8') as f:
f.write(html_table)
df_html = pd.read_html('sample.html')[0]
print("HTML 表格读取:")
print(df_html)
代码示例:综合实战
实战1:多源数据整合分析
import pandas as pd
import numpy as np
import sqlite3
# 场景:整合来自不同数据源的信息
# 数据源1:CSV 文件(员工基本信息)
emp_csv = pd.DataFrame({
'emp_id': [1, 2, 3, 4, 5],
'name': ['张三', '李四', '王五', '赵六', '钱七'],
'gender': ['男', '女', '男', '男', '女'],
'birth_year': [1990, 1988, 1992, 1985, 1993]
})
emp_csv.to_csv('employees_basic.csv', index=False, encoding='utf-8-sig')
# 数据源2:Excel 文件(绩效数据)
perf_excel = pd.DataFrame({
'emp_id': [1, 2, 3, 4, 5],
'q1_score': [85, 92, 78, 96, 88],
'q2_score': [88, 90, 82, 94, 85],
'q3_score': [90, 88, 85, 92, 90]
})
with pd.ExcelWriter('employees_perf.xlsx', engine='openpyxl') as writer:
perf_excel.to_excel(writer, sheet_name='绩效', index=False)
# 数据源3:SQLite 数据库(薪资数据)
conn = sqlite3.connect('salary.db')
pd.DataFrame({
'emp_id': [1, 2, 3, 4, 5],
'base_salary': [15000, 12000, 13000, 18000, 11000],
'bonus': [3000, 5000, 2000, 8000, 2500]
}).to_sql('salary', conn, if_exists='replace', index=False)
# 整合分析
print("=" * 50)
print("多源数据整合分析")
print("=" * 50)
# 读取各数据源
basic = pd.read_csv('employees_basic.csv')
perf = pd.read_excel('employees_perf.xlsx', sheet_name='绩效')
salary = pd.read_sql('SELECT * FROM salary', conn)
# 合并数据
merged = basic.merge(perf, on='emp_id').merge(salary, on='emp_id')
merged['avg_score'] = merged[['q1_score', 'q2_score', 'q3_score']].mean(axis=1)
merged['total_income'] = merged['base_salary'] + merged['bonus']
print("\n整合后的完整数据:")
print(merged.to_string())
print("\n各季度平均绩效:")
print(f" Q1: {perf['q1_score'].mean():.1f}")
print(f" Q2: {perf['q2_score'].mean():.1f}")
print(f" Q3: {perf['q3_score'].mean():.1f}")
print("\n平均薪资:")
print(f" 基本薪资均值: ¥{salary['base_salary'].mean():,.0f}")
print(f" 奖金均值: ¥{salary['bonus'].mean():,.0f}")
print(f" 总收入均值: ¥{merged['total_income'].mean():,.0f}")
conn.close()
实战2:大文件处理流水线
import pandas as pd
import numpy as np
# 创建模拟日志文件(50 万行)
np.random.seed(42)
n = 500_000
log_data = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=n, freq='s'),
'user_id': np.random.randint(1, 10000, n),
'action': np.random.choice(['click', 'view', 'purchase', 'logout'], n,
p=[0.5, 0.3, 0.15, 0.05]),
'page': np.random.choice(['home', 'product', 'cart', 'checkout', 'help'], n),
'duration_ms': np.random.exponential(2000, n).astype(int)
})
log_data.to_csv('web_logs.csv', index=False)
print(f"已创建 {n} 行日志数据")
# 分块读取并聚合
print("\n分块处理日志数据:")
stats = {}
chunk_size = 50_000
for chunk in pd.read_csv('web_logs.csv',
usecols=['action', 'user_id', 'duration_ms'],
chunksize=chunk_size):
# 按 action 分组统计
action_counts = chunk['action'].value_counts()
for action, count in action_counts.items():
stats[action] = stats.get(action, 0) + count
# 统计总用户数
stats['unique_users'] = len(chunk['user_id'].unique())
print(f"处理 {chunk_size} 行...")
print("\n统计结果:")
for action, count in stats.items():
print(f" {action}: {count:,}" if action != 'unique_users'
else f" 独立用户数: {count:,}")
实战练习
练习1:CSV 读取与清洗
题目:创建一个包含脏数据的 CSV 文件,然后读取并清洗:
- 包含空值
- 包含重复行
- 包含错误类型(如价格列出现了文本)
参考答案:
import pandas as pd
import numpy as np
# 创建脏数据
dirty_csv = """id,name,price,quantity
1,手机,2999,2
2,电脑,5999,1
3,,1999,3
4,耳机,abc,5
5,键盘,399,2
1,手机,2999,2
6,鼠标,99,
"""
with open('dirty_data.csv', 'w', encoding='utf-8') as f:
f.write(dirty_csv)
# 读取并清洗
df = pd.read_csv('dirty_data.csv')
print("原始数据:")
print(df)
# 1. 删除完全重复的行
df = df.drop_duplicates()
# 2. 删除 name 为空的行
df = df.dropna(subset=['name'])
# 3. 处理 price 列的文本(转换为数字,无效的变 NaN)
df['price'] = pd.to_numeric(df['price'], errors='coerce')
# 4. 填充 quantity 的缺失值为 1
df['quantity'] = df['quantity'].fillna(1).astype(int)
# 5. 删除 price 仍然为 NaN 的行
df = df.dropna(subset=['price'])
print("\n清洗后数据:")
print(df)
练习2:分块统计
题目:用分块读取的方式,计算大文件中每个 category 的平均值。
参考答案:
import pandas as pd
import numpy as np
from collections import defaultdict
# 先创建大文件
np.random.seed(42)
n = 200_000
big = pd.DataFrame({
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
'value': np.random.normal(50, 15, n)
})
big.to_csv('big_categories.csv', index=False)
# 分块统计
cat_stats = defaultdict(lambda: {'sum': 0, 'count': 0})
for chunk in pd.read_csv('big_categories.csv', chunksize=50000):
for cat, group in chunk.groupby('category'):
cat_stats[cat]['sum'] += group['value'].sum()
cat_stats[cat]['count'] += len(group)
print("分类统计:")
for cat, stats in sorted(cat_stats.items()):
mean = stats['sum'] / stats['count']
print(f" {cat}: 均值={mean:.2f}, 数量={stats['count']}")
练习3:数据库查询整合
题目:创建一个 SQLite 数据库,包含订单表和客户表,然后用 JOIN 查询每个客户的总消费金额。
参考答案:
import pandas as pd
import sqlite3
conn = sqlite3.connect('shop.db')
# 创建客户表
customers = pd.DataFrame({
'customer_id': [1, 2, 3, 4, 5],
'name': ['张三', '李四', '王五', '赵六', '钱七'],
'city': ['北京', '上海', '广州', '深圳', '杭州']
})
customers.to_sql('customers', conn, if_exists='replace', index=False)
# 创建订单表
orders = pd.DataFrame({
'order_id': range(1, 11),
'customer_id': [1, 2, 1, 3, 2, 4, 1, 5, 3, 2],
'amount': [500, 800, 300, 1200, 600, 400, 700, 900, 350, 550]
})
orders.to_sql('orders', conn, if_exists='replace', index=False)
# JOIN 查询
result = pd.read_sql('''
SELECT c.name, c.city,
COUNT(o.order_id) as order_count,
SUM(o.amount) as total_spent,
AVG(o.amount) as avg_order
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id
ORDER BY total_spent DESC
''', conn)
print("客户消费分析:")
print(result.to_string())
conn.close()
本节总结
本节我们全面学习了从各种数据源读取数据的方法。关键要点:
CSV 读取是基本功:掌握 pd.read_csv() 的各种参数——sep、encoding、usecols、dtype、parse_dates、nrows、chunksize 等。
编码问题是日常痛点:Windows 中文系统导出的文件通常用 GBK 编码。遇到问题时,尝试 encoding='gbk' 或 encoding='utf-8-sig'。使用 chardet 库可以自动检测编码。
分块读取解决大文件:当文件超过内存容量时,使用 chunksize 参数分块处理。配合 usecols 和 dtype 参数可以进一步优化内存。
数据库读取很灵活:pd.read_sql() 可以执行任意 SQL 查询,直接在数据库层面完成过滤和聚合,只读取需要的结果,效率最高。
性能优化三板斧:
- 只读需要的列(
usecols) - 指定数据类型(
dtype),特别是category类型可以大幅节省内存 - 在数据库层面完成过滤,减少数据传输量
多种格式统一接口:无论是 CSV、Excel、JSON 还是数据库,pandas 都提供了 read_xxx() 的统一接口,降低了学习成本。
实用建议:在实际工作中,先花 5 分钟检查数据文件的编码、格式和大小,再决定用哪种方式读取。这一步可以避免后面 90% 的问题。
下一节预告
恭喜!你已经完成了第一阶段的全部内容。回顾一下你掌握的技能:
- 环境配置与 Jupyter Notebook 使用
- NumPy 数组的创建、运算和广播
- 高级索引、维度变换和矩阵运算
- 向量化思维,让代码效率提升百倍
- 从各种数据源高效读取数据
接下来,我们将进入第二阶段,开始学习 pandas 的核心功能——数据处理与分析。你将掌握:
- DataFrame 的基本操作和行列选择
- 数据清洗与预处理
- 数据分组与聚合(groupby)
- 数据合并与连接(merge/concat)
pandas 才是数据分析的主力工具,让我们继续前进!
以上就是Pandas高效读取CSV、Excel和SQL数据库的数据的详细内容,更多关于Pandas数据读取的资料请关注脚本之家其它相关文章!
