python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python拆分CSV

Python实现批量将CSV按行或按列拆分

作者:小庄-Python办公

这篇文章主要介绍了一个基于Python标准库csv和PyQt5的高性能CSV拆分工具,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

本文完整拆解一个可用于生产的 CSV 拆分工具,从架构、核心算法、GUI 与多线程到异常与性能边界,全部基于 Python 标准库 csv 与 PyQt5 实现。

效果图

需求与目标

核心目标是实现一个桌面 GUI:

总体设计

项目结构极简,仅一个入口脚本:

设计重点是把“耗时 IO 与 CPU”放在后台线程,把“交互与展示”留给主线程。

CSV 拆分核心逻辑

核心函数是 [split_csv_file],仅使用 csv.reader/csv.writer,满足低内存与高兼容性。

1. 统一设置与结果模型

通过 SplitSettingsFileResult 描述输入配置与输出状态:

2. 按行拆分

按行拆分包含两种子模式:

按固定行数

关键点:

示例片段(仅为理解):

if count == 0:
    seq_text = format_sequence(seq, settings.seq_format)
    filename = f"{settings.prefix}{seq_text}{settings.suffix}.csv"
    out_file = os.path.join(settings.output_dir, filename)

按唯一列值

核心策略是使用 WriterCache 复用输出文件句柄,避免每行开关文件:

3. 按列拆分与二次按行

按列拆分本质上也是“按唯一值分组输出”,并可选二次按行拆分:

4. 文件名安全与序号格式

为防止非法字符,统一使用 sanitize_filename 处理列值:

序号格式支持两种:

GUI 设计与交互流

GUI 核心在拆分成多个可复用面板。

1. 文件选择与拖拽

FileListWidget 继承 QListWidget,实现拖拽导入:

2. 模式切换

按行/按列通过 QStackedWidget 互斥显示:

3. 参数联动与列名读取

当文件列表变化或编码/分隔符变化时,自动刷新列名:

4. 输出配置与命名规则

统一面板提供:

多线程与任务生命周期

耗时任务在 QThread 中执行:

调用流程:

进度与日志设计

进度条

进度分两层:

日志

日志输出包含时间戳与异常堆栈:

异常处理与容错策略

异常处理贯穿拆分流程:

设计要点:

性能与内存控制

针对 1GB 大文件设计要点:

内存占用主要来自:

对于“按唯一列值”的模式,如果唯一值极多:

源代码

'''
作者:小庄-Python办公
公众号:小庄-Python学习
'''
import sys
import os
import csv
import traceback
import errno
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

from PyQt5.QtCore import Qt, QObject, pyqtSignal, QThread
from PyQt5.QtGui import QFont
from PyQt5.QtWidgets import (
    QApplication,
    QMainWindow,
    QWidget,
    QFileDialog,
    QListWidget,
    QListWidgetItem,
    QVBoxLayout,
    QHBoxLayout,
    QLabel,
    QPushButton,
    QRadioButton,
    QButtonGroup,
    QStackedWidget,
    QGroupBox,
    QLineEdit,
    QSpinBox,
    QComboBox,
    QCheckBox,
    QTextEdit,
    QMessageBox,
    QProgressBar,
)


def timestamp() -> str:
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


def sanitize_filename(value: str) -> str:
    if value is None:
        value = ""
    value = str(value).strip()
    if not value:
        value = "EMPTY"
    for ch in ['\\', '/', ':', '*', '?', '"', '<', '>', '|']:
        value = value.replace(ch, "_")
    value = value.replace("\n", "_").replace("\r", "_").replace("\t", "_")
    value = value.strip(" .")
    if not value:
        value = "EMPTY"
    if len(value) > 120:
        value = value[:120]
    return value


def format_sequence(seq: int, pattern: str) -> str:
    if "{" in pattern and "}" in pattern:
        return pattern.format(seq)
    width = max(1, len(pattern))
    return str(seq).zfill(width)


def ensure_dir(path: str) -> None:
    if not os.path.isdir(path):
        os.makedirs(path, exist_ok=True)


@dataclass
class SplitSettings:
    mode: str
    row_mode: str
    rows_per_file: int
    unique_column: str
    col_column: str
    col_secondary_split: bool
    col_secondary_rows: int
    prefix: str
    suffix: str
    seq_format: str
    keep_header: bool
    delimiter: str
    encoding: str
    output_dir: str
    on_exists: str
    max_open_files: int = 64


@dataclass
class FileResult:
    file_path: str
    success: bool
    created_files: List[str] = field(default_factory=list)
    skipped_files: List[str] = field(default_factory=list)
    error_message: str = ""
    exception: Optional[Exception] = None
    traceback_text: str = ""


def read_header(file_path: str, encoding: str, delimiter: str) -> List[str]:
    with open(file_path, "r", newline="", encoding=encoding) as f:
        reader = csv.reader(f, delimiter=delimiter)
        return next(reader, [])


class WriterCache:
    def __init__(self, max_open: int, created_set: set):
        self.max_open = max_open
        self._order: List[str] = []
        self._writers: Dict[str, Tuple[object, csv.writer]] = {}
        self._created_set = created_set

    def get_writer(self, key: str, file_path: str, encoding: str, delimiter: str, header: Optional[List[str]], on_exists: str, created_files: List[str], skipped_files: List[str]) -> Optional[csv.writer]:
        if key in self._writers:
            if key in self._order:
                self._order.remove(key)
            self._order.append(key)
            return self._writers[key][1]

        file_exists = os.path.exists(file_path)
        if file_exists and file_path not in self._created_set and on_exists == "skip":
            skipped_files.append(file_path)
            return None

        ensure_dir(os.path.dirname(file_path))
        if file_exists and file_path in self._created_set:
            mode = "a"
            header_to_write = None
        else:
            mode = "w"
            header_to_write = header
            self._created_set.add(file_path)
            created_files.append(file_path)
        f = open(file_path, mode, newline="", encoding=encoding)
        writer = csv.writer(f, delimiter=delimiter)
        if header_to_write:
            writer.writerow(header_to_write)
        self._writers[key] = (f, writer)
        self._order.append(key)
        while len(self._order) > self.max_open:
            evict_key = self._order.pop(0)
            handle, _ = self._writers.pop(evict_key)
            try:
                handle.close()
            except Exception:
                pass
        return writer

    def close_all(self) -> None:
        for handle, _ in self._writers.values():
            try:
                handle.close()
            except Exception:
                pass
        self._writers.clear()
        self._order.clear()


def split_csv_file(
    file_path: str,
    settings: SplitSettings,
    log_cb: Callable[[str], None],
    progress_cb: Callable[[float], None],
    cancel_cb: Callable[[], bool],
) -> FileResult:
    result = FileResult(file_path=file_path, success=True)
    file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
    file_size = max(1, file_size)

    def update_progress(file_obj) -> None:
        try:
            ratio = min(1.0, file_obj.tell() / file_size)
            progress_cb(ratio)
        except Exception:
            pass

    try:
        with open(file_path, "r", newline="", encoding=settings.encoding) as f:
            reader = csv.reader(f, delimiter=settings.delimiter)
            header = next(reader, None)
            if header is None:
                return result
            header_row = header if settings.keep_header else None

            created_set: set = set()
            if settings.mode == "row":
                if settings.row_mode == "fixed":
                    seq = 1
                    count = 0
                    row_index = 0
                    writer = None
                    handle = None
                    for row in reader:
                        if cancel_cb():
                            result.success = False
                            result.error_message = "用户取消"
                            break
                        if count == 0:
                            seq_text = format_sequence(seq, settings.seq_format)
                            filename = f"{settings.prefix}{seq_text}{settings.suffix}.csv"
                            out_file = os.path.join(settings.output_dir, filename)
                            if os.path.exists(out_file) and settings.on_exists == "skip":
                                result.skipped_files.append(out_file)
                                writer = None
                                handle = None
                            else:
                                ensure_dir(settings.output_dir)
                                handle = open(out_file, "w", newline="", encoding=settings.encoding)
                                writer = csv.writer(handle, delimiter=settings.delimiter)
                                if header_row:
                                    writer.writerow(header_row)
                                result.created_files.append(out_file)
                            seq += 1
                        if writer:
                            writer.writerow(row)
                        count += 1
                        row_index += 1
                        if count >= settings.rows_per_file:
                            count = 0
                            if handle:
                                try:
                                    handle.close()
                                except Exception:
                                    pass
                            writer = None
                            handle = None
                        if row_index % 2000 == 0:
                            update_progress(f)
                    if handle:
                        try:
                            handle.close()
                        except Exception:
                            pass
                    update_progress(f)
                else:
                    if settings.unique_column not in header:
                        raise ValueError(f"列名不存在: {settings.unique_column}")
                    col_index = header.index(settings.unique_column)
                    cache = WriterCache(settings.max_open_files, created_set)
                    skipped_values: set = set()
                    for row in reader:
                        if cancel_cb():
                            result.success = False
                            result.error_message = "用户取消"
                            break
                        key = sanitize_filename(row[col_index] if col_index < len(row) else "")
                        if key in skipped_values:
                            continue
                        filename = f"{settings.prefix}{key}{settings.suffix}.csv"
                        out_file = os.path.join(settings.output_dir, filename)
                        writer = cache.get_writer(key, out_file, settings.encoding, settings.delimiter, header_row, settings.on_exists, result.created_files, result.skipped_files)
                        if writer is None:
                            skipped_values.add(key)
                            continue
                        writer.writerow(row)
                        if len(result.created_files) % 100 == 0:
                            update_progress(f)
                    cache.close_all()
                    update_progress(f)
            else:
                if settings.col_column not in header:
                    raise ValueError(f"列名不存在: {settings.col_column}")
                col_index = header.index(settings.col_column)
                cache = WriterCache(settings.max_open_files, created_set)
                value_state: Dict[str, Dict[str, int]] = {}
                skipped_values: set = set()
                for row in reader:
                    if cancel_cb():
                        result.success = False
                        result.error_message = "用户取消"
                        break
                    value = sanitize_filename(row[col_index] if col_index < len(row) else "")
                    if value in skipped_values:
                        continue
                    if settings.col_secondary_split:
                        state = value_state.setdefault(value, {"seq": 1, "count": 0})
                        if state["count"] == 0:
                            seq_text = format_sequence(state["seq"], settings.seq_format)
                            filename = f"{settings.prefix}{value}_{seq_text}{settings.suffix}.csv"
                            out_file = os.path.join(settings.output_dir, filename)
                            writer = cache.get_writer(f"{value}:{state['seq']}", out_file, settings.encoding, settings.delimiter, header_row, settings.on_exists, result.created_files, result.skipped_files)
                            if writer is None:
                                skipped_values.add(value)
                                continue
                            state["seq"] += 1
                        else:
                            writer = cache.get_writer(f"{value}:{state['seq']-1}", os.path.join(settings.output_dir, f"{settings.prefix}{value}_{format_sequence(state['seq']-1, settings.seq_format)}{settings.suffix}.csv"), settings.encoding, settings.delimiter, None, settings.on_exists, result.created_files, result.skipped_files)
                        if writer:
                            writer.writerow(row)
                        state["count"] += 1
                        if state["count"] >= settings.col_secondary_rows:
                            state["count"] = 0
                    else:
                        filename = f"{settings.prefix}{value}{settings.suffix}.csv"
                        out_file = os.path.join(settings.output_dir, filename)
                        writer = cache.get_writer(value, out_file, settings.encoding, settings.delimiter, header_row, settings.on_exists, result.created_files, result.skipped_files)
                        if writer is None:
                            skipped_values.add(value)
                            continue
                        writer.writerow(row)
                    if len(result.created_files) % 100 == 0:
                        update_progress(f)
                cache.close_all()
                update_progress(f)
    except UnicodeDecodeError as e:
        result.success = False
        result.error_message = f"编码错误: {e}"
        result.exception = e
        result.traceback_text = traceback.format_exc()
    except Exception as e:
        result.success = False
        result.error_message = str(e)
        result.exception = e
        result.traceback_text = traceback.format_exc()

    if result.exception and isinstance(result.exception, OSError) and getattr(result.exception, "errno", None) == errno.ENOSPC:
        log_cb(f"{timestamp()} 磁盘空间不足: {result.exception}")
    return result


class Worker(QObject):
    overall_progress = pyqtSignal(int)
    file_progress = pyqtSignal(int)
    log = pyqtSignal(str)
    finished = pyqtSignal(dict)
    show_error = pyqtSignal(str, str)

    def __init__(self, files: List[str], settings: SplitSettings):
        super().__init__()
        self.files = files
        self.settings = settings
        self._cancel = False
        self._created_files: List[str] = []

    def cancel(self) -> None:
        self._cancel = True

    def _is_cancelled(self) -> bool:
        return self._cancel

    def run(self) -> None:
        total_size = sum(max(1, os.path.getsize(p)) for p in self.files)
        processed_size = 0
        success_count = 0
        fail_count = 0

        for file_path in self.files:
            if self._cancel:
                break
            file_size = max(1, os.path.getsize(file_path))
            self.log.emit(f"{timestamp()} 开始处理: {file_path}")

            def log_cb(msg: str) -> None:
                self.log.emit(msg)

            def progress_cb(ratio: float) -> None:
                self.file_progress.emit(int(ratio * 100))
                overall_ratio = min(1.0, (processed_size + ratio * file_size) / total_size)
                self.overall_progress.emit(int(overall_ratio * 100))

            result = split_csv_file(file_path, self.settings, log_cb, progress_cb, self._is_cancelled)
            self._created_files.extend(result.created_files)
            if result.success:
                success_count += 1
                self.log.emit(f"{timestamp()} 完成: {file_path} 输出 {len(result.created_files)} 个文件")
            else:
                fail_count += 1
                detail = result.error_message
                if result.traceback_text:
                    detail = f"{result.error_message}\n{result.traceback_text}"
                self.log.emit(f"{timestamp()} 失败: {file_path}\n{detail}")
                if isinstance(result.exception, OSError) and getattr(result.exception, "errno", None) == errno.ENOSPC:
                    self.show_error.emit("磁盘空间不足", detail)
            processed_size += file_size
            self.file_progress.emit(100)
            self.overall_progress.emit(int(min(1.0, processed_size / total_size) * 100))

        summary = {
            "success": success_count,
            "failed": fail_count,
            "cancelled": self._cancel,
        }
        if self._cancel:
            for path in self._created_files:
                try:
                    if os.path.exists(path):
                        os.remove(path)
                except Exception:
                    pass
        self.finished.emit(summary)


class FileListWidget(QListWidget):
    def __init__(self):
        super().__init__()
        self.setSelectionMode(QListWidget.ExtendedSelection)
        self.setAcceptDrops(True)

    def dragEnterEvent(self, event):
        if event.mimeData().hasUrls():
            event.acceptProposedAction()
        else:
            super().dragEnterEvent(event)

    def dropEvent(self, event):
        if event.mimeData().hasUrls():
            for url in event.mimeData().urls():
                path = url.toLocalFile()
                if path and path.lower().endswith(".csv"):
                    self.add_file(path)
            event.acceptProposedAction()
        else:
            super().dropEvent(event)

    def add_file(self, path: str) -> None:
        for i in range(self.count()):
            if self.item(i).text() == path:
                return
        self.addItem(QListWidgetItem(path))

    def files(self) -> List[str]:
        return [self.item(i).text() for i in range(self.count())]


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("CSV 拆分工具 作者:小庄-Python办公")
        self.resize(1000, 720)

        self.file_list = FileListWidget()
        self.add_btn = QPushButton("添加文件")
        self.remove_btn = QPushButton("移除选中")
        self.clear_btn = QPushButton("清空列表")

        self.mode_row = QRadioButton("按行拆分")
        self.mode_col = QRadioButton("按列拆分")
        self.mode_row.setChecked(True)
        self.mode_group = QButtonGroup()
        self.mode_group.addButton(self.mode_row)
        self.mode_group.addButton(self.mode_col)

        self.stack = QStackedWidget()
        self.row_panel = self._build_row_panel()
        self.col_panel = self._build_col_panel()
        self.stack.addWidget(self.row_panel)
        self.stack.addWidget(self.col_panel)

        self.prefix_input = QLineEdit("part_")
        self.suffix_input = QLineEdit("")
        self.seq_input = QLineEdit("0001")
        self.keep_header = QCheckBox("保留表头")
        self.keep_header.setChecked(True)

        self.output_dir_input = QLineEdit("")
        self.output_browse_btn = QPushButton("选择输出目录")
        self.delimiter_combo = QComboBox()
        self.delimiter_combo.addItems([",", ";", "\\t", "|"])
        self.encoding_combo = QComboBox()
        self.encoding_combo.addItems(["UTF-8", "UTF-8-sig", "GBK"])
        self.exists_overwrite = QRadioButton("覆盖")
        self.exists_skip = QRadioButton("跳过")
        self.exists_overwrite.setChecked(True)

        self.start_btn = QPushButton("开始")
        self.cancel_btn = QPushButton("取消")
        self.cancel_btn.setEnabled(False)
        self.export_log_btn = QPushButton("导出日志")

        self.total_progress = QProgressBar()
        self.file_progress = QProgressBar()
        self.log_text = QTextEdit()
        self.log_text.setReadOnly(True)
        self.log_text.setFont(QFont("Consolas", 10))

        layout = QVBoxLayout()
        layout.addWidget(self._build_files_group())
        layout.addWidget(self._build_mode_group())
        layout.addWidget(self._build_naming_group())
        layout.addWidget(self._build_output_group())
        layout.addWidget(self._build_progress_group())
        layout.addWidget(self._build_log_group())

        container = QWidget()
        container.setLayout(layout)
        self.setCentralWidget(container)

        self.add_btn.clicked.connect(self._add_files)
        self.remove_btn.clicked.connect(self._remove_selected)
        self.clear_btn.clicked.connect(self.file_list.clear)
        self.mode_row.toggled.connect(self._switch_mode)
        self.mode_col.toggled.connect(self._switch_mode)
        self.output_browse_btn.clicked.connect(self._choose_output_dir)
        self.start_btn.clicked.connect(self._start)
        self.cancel_btn.clicked.connect(self._cancel)
        self.export_log_btn.clicked.connect(self._export_log)
        self.delimiter_combo.currentIndexChanged.connect(self._refresh_headers)
        self.encoding_combo.currentIndexChanged.connect(self._refresh_headers)
        self.file_list.itemSelectionChanged.connect(self._refresh_headers)

        self.worker_thread: Optional[QThread] = None
        self.worker: Optional[Worker] = None

    def _build_files_group(self) -> QGroupBox:
        group = QGroupBox("文件选择")
        layout = QVBoxLayout()
        btn_layout = QHBoxLayout()
        btn_layout.addWidget(self.add_btn)
        btn_layout.addWidget(self.remove_btn)
        btn_layout.addWidget(self.clear_btn)
        layout.addLayout(btn_layout)
        layout.addWidget(self.file_list)
        group.setLayout(layout)
        return group

    def _build_mode_group(self) -> QGroupBox:
        group = QGroupBox("拆分模式")
        layout = QVBoxLayout()
        mode_layout = QHBoxLayout()
        mode_layout.addWidget(self.mode_row)
        mode_layout.addWidget(self.mode_col)
        layout.addLayout(mode_layout)
        layout.addWidget(self.stack)
        group.setLayout(layout)
        return group

    def _build_row_panel(self) -> QWidget:
        panel = QWidget()
        layout = QVBoxLayout()
        self.row_fixed = QRadioButton("按固定行数")
        self.row_unique = QRadioButton("按唯一列值")
        self.row_fixed.setChecked(True)
        self.row_group = QButtonGroup()
        self.row_group.addButton(self.row_fixed)
        self.row_group.addButton(self.row_unique)
        row_mode_layout = QHBoxLayout()
        row_mode_layout.addWidget(self.row_fixed)
        row_mode_layout.addWidget(self.row_unique)
        layout.addLayout(row_mode_layout)

        fixed_layout = QHBoxLayout()
        fixed_layout.addWidget(QLabel("每文件行数"))
        self.rows_per_file = QSpinBox()
        self.rows_per_file.setRange(1, 10_000_000)
        self.rows_per_file.setValue(100000)
        fixed_layout.addWidget(self.rows_per_file)
        layout.addLayout(fixed_layout)

        unique_layout = QHBoxLayout()
        unique_layout.addWidget(QLabel("列名"))
        self.row_unique_column = QComboBox()
        unique_layout.addWidget(self.row_unique_column)
        layout.addLayout(unique_layout)

        panel.setLayout(layout)
        return panel

    def _build_col_panel(self) -> QWidget:
        panel = QWidget()
        layout = QVBoxLayout()
        col_layout = QHBoxLayout()
        col_layout.addWidget(QLabel("列名"))
        self.col_column = QComboBox()
        col_layout.addWidget(self.col_column)
        layout.addLayout(col_layout)

        split_layout = QHBoxLayout()
        self.col_secondary = QCheckBox("每子文件再按行数拆分")
        split_layout.addWidget(self.col_secondary)
        split_layout.addWidget(QLabel("行数"))
        self.col_secondary_rows = QSpinBox()
        self.col_secondary_rows.setRange(1, 10_000_000)
        self.col_secondary_rows.setValue(100000)
        split_layout.addWidget(self.col_secondary_rows)
        layout.addLayout(split_layout)
        panel.setLayout(layout)
        return panel

    def _build_naming_group(self) -> QGroupBox:
        group = QGroupBox("命名规则")
        layout = QVBoxLayout()
        row1 = QHBoxLayout()
        row1.addWidget(QLabel("前缀"))
        row1.addWidget(self.prefix_input)
        row1.addWidget(QLabel("后缀"))
        row1.addWidget(self.suffix_input)
        layout.addLayout(row1)
        row2 = QHBoxLayout()
        row2.addWidget(QLabel("序号格式"))
        row2.addWidget(self.seq_input)
        row2.addWidget(self.keep_header)
        layout.addLayout(row2)
        group.setLayout(layout)
        return group

    def _build_output_group(self) -> QGroupBox:
        group = QGroupBox("输出配置")
        layout = QVBoxLayout()
        row1 = QHBoxLayout()
        row1.addWidget(QLabel("输出目录"))
        row1.addWidget(self.output_dir_input)
        row1.addWidget(self.output_browse_btn)
        layout.addLayout(row1)

        row2 = QHBoxLayout()
        row2.addWidget(QLabel("分隔符"))
        row2.addWidget(self.delimiter_combo)
        row2.addWidget(QLabel("编码"))
        row2.addWidget(self.encoding_combo)
        layout.addLayout(row2)

        row3 = QHBoxLayout()
        row3.addWidget(QLabel("已存在文件"))
        row3.addWidget(self.exists_overwrite)
        row3.addWidget(self.exists_skip)
        layout.addLayout(row3)
        group.setLayout(layout)
        return group

    def _build_progress_group(self) -> QGroupBox:
        group = QGroupBox("进度")
        layout = QVBoxLayout()
        layout.addWidget(QLabel("总体进度"))
        layout.addWidget(self.total_progress)
        layout.addWidget(QLabel("当前文件进度"))
        layout.addWidget(self.file_progress)
        row = QHBoxLayout()
        row.addWidget(self.start_btn)
        row.addWidget(self.cancel_btn)
        row.addWidget(self.export_log_btn)
        layout.addLayout(row)
        group.setLayout(layout)
        return group

    def _build_log_group(self) -> QGroupBox:
        group = QGroupBox("日志")
        layout = QVBoxLayout()
        layout.addWidget(self.log_text)
        group.setLayout(layout)
        return group

    def _switch_mode(self) -> None:
        self.stack.setCurrentIndex(0 if self.mode_row.isChecked() else 1)

    def _add_files(self) -> None:
        files, _ = QFileDialog.getOpenFileNames(self, "选择 CSV 文件", "", "CSV Files (*.csv)")
        for path in files:
            self.file_list.add_file(path)
        self._refresh_headers()

    def _remove_selected(self) -> None:
        for item in self.file_list.selectedItems():
            row = self.file_list.row(item)
            self.file_list.takeItem(row)
        self._refresh_headers()

    def _choose_output_dir(self) -> None:
        path = QFileDialog.getExistingDirectory(self, "选择输出目录")
        if path:
            self.output_dir_input.setText(path)

    def _delimiter_value(self) -> str:
        text = self.delimiter_combo.currentText()
        return "\t" if text == "\\t" else text

    def _refresh_headers(self) -> None:
        files = self.file_list.files()
        if not files:
            self.row_unique_column.clear()
            self.col_column.clear()
            return
        path = files[0]
        try:
            header = read_header(path, self.encoding_combo.currentText(), self._delimiter_value())
            self.row_unique_column.clear()
            self.col_column.clear()
            self.row_unique_column.addItems(header)
            self.col_column.addItems(header)
        except Exception as e:
            self.log_text.append(f"{timestamp()} 读取表头失败: {e}")
            self.row_unique_column.clear()
            self.col_column.clear()

    def _build_settings(self) -> Optional[SplitSettings]:
        files = self.file_list.files()
        if not files:
            QMessageBox.warning(self, "提示", "请先选择 CSV 文件")
            return None
        output_dir = self.output_dir_input.text().strip()
        if not output_dir:
            QMessageBox.warning(self, "提示", "请指定输出目录")
            return None
        delimiter = self._delimiter_value()
        encoding = self.encoding_combo.currentText()
        on_exists = "overwrite" if self.exists_overwrite.isChecked() else "skip"
        mode = "row" if self.mode_row.isChecked() else "col"
        row_mode = "fixed" if self.row_fixed.isChecked() else "unique"
        settings = SplitSettings(
            mode=mode,
            row_mode=row_mode,
            rows_per_file=self.rows_per_file.value(),
            unique_column=self.row_unique_column.currentText(),
            col_column=self.col_column.currentText(),
            col_secondary_split=self.col_secondary.isChecked(),
            col_secondary_rows=self.col_secondary_rows.value(),
            prefix=self.prefix_input.text(),
            suffix=self.suffix_input.text(),
            seq_format=self.seq_input.text() or "0001",
            keep_header=self.keep_header.isChecked(),
            delimiter=delimiter,
            encoding=encoding,
            output_dir=output_dir,
            on_exists=on_exists,
        )
        return settings

    def _start(self) -> None:
        settings = self._build_settings()
        if not settings:
            return
        files = self.file_list.files()
        ensure_dir(settings.output_dir)
        self.total_progress.setValue(0)
        self.file_progress.setValue(0)
        self.start_btn.setEnabled(False)
        self.cancel_btn.setEnabled(True)
        self.log_text.append(f"{timestamp()} 任务开始,文件数: {len(files)}")

        self.worker_thread = QThread()
        self.worker = Worker(files, settings)
        self.worker.moveToThread(self.worker_thread)
        self.worker_thread.started.connect(self.worker.run)
        self.worker.log.connect(self.log_text.append)
        self.worker.file_progress.connect(self.file_progress.setValue)
        self.worker.overall_progress.connect(self.total_progress.setValue)
        self.worker.finished.connect(self._finish)
        self.worker.show_error.connect(self._show_error)
        self.worker_thread.start()

    def _finish(self, summary: dict) -> None:
        if self.worker_thread:
            self.worker_thread.quit()
            self.worker_thread.wait()
        self.start_btn.setEnabled(True)
        self.cancel_btn.setEnabled(False)
        if summary.get("cancelled"):
            QMessageBox.information(self, "已取消", "任务已取消,临时文件已清理")
        else:
            QMessageBox.information(
                self,
                "完成",
                f"成功: {summary.get('success', 0)},失败: {summary.get('failed', 0)}",
            )

    def _cancel(self) -> None:
        if self.worker:
            self.worker.cancel()
            self.log_text.append(f"{timestamp()} 用户取消任务")
            self.cancel_btn.setEnabled(False)

    def _show_error(self, title: str, detail: str) -> None:
        QMessageBox.critical(self, title, detail)

    def _export_log(self) -> None:
        path, _ = QFileDialog.getSaveFileName(self, "导出日志", "log.txt", "Text Files (*.txt)")
        if not path:
            return
        try:
            with open(path, "w", encoding="utf-8") as f:
                f.write(self.log_text.toPlainText())
            QMessageBox.information(self, "成功", "日志已导出")
        except Exception as e:
            QMessageBox.critical(self, "失败", str(e))


def main() -> None:
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())


if __name__ == "__main__":
    main()

可扩展方向

如果继续增强,可考虑:

结语

这个工具的核心理念是“稳定、低内存、高可用”。通过标准库 csv 与 PyQt5 的组合,达成了性能与交互的平衡,并保持项目结构简单、易维护。对于日常办公与大规模数据拆分,已具备可直接使用的工程级质量。

以上就是Python实现批量将CSV按行或按列拆分的详细内容,更多关于Python拆分CSV的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文