python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python PDF批量水印

Python+PyQt5实现PDF批量水印工具

作者:小庄-Python办公

本文围绕 PDF添加水印.py,完整讲解如何将单文件水印脚本重构为可交互的 PyQt5 桌面应用,支持拖拽导入、实时预览、子线程处理与输出管理,并给出关键实现细节与结构说明,需要的朋友可以参考下

PyQt5 的 PDF 批量水印工具:

本文围绕 PDF添加水印.py,完整讲解如何将单文件水印脚本重构为可交互的 PyQt5 桌面应用,支持拖拽导入、实时预览、子线程处理与输出管理,并给出关键实现细节与结构说明。

一、功能概览

这款工具面向“批量给 PDF 添加文字水印”的场景,核心功能如下:

二、整体架构与模块拆分

应用主要分为三层:

UI 层(WatermarkApp)
负责界面布局、参数采集、按钮交互、预览触发、状态展示等。

处理层(WatermarkWorker / PreviewWorker)
使用 QThread 执行批量水印与预览生成,避免阻塞主线程。

工具层(字体解析 / 输出重名 / 水印生成)
封装公共逻辑,确保主流程清晰且复用性强。

三、界面布局设计

界面布局遵循“左列表、右参数、底部操作区”的结构。

1. 左侧:PDF 文件列表 + 添加/删除

2. 右侧:参数面板(按顺序分组)

3. 底部:操作按钮 + 进度/状态

四、拖拽导入实现

自定义 PdfListWidget,重写拖拽相关事件:

这样用户可以直接把 PDF 拖进列表,极大提升效率。

五、实时预览机制

预览的核心思想是:

  1. 使用当前 UI 参数生成一个水印 PDF
  2. 将水印合成到原 PDF 的第一页
  3. 保存为临时预览文件并自动打开

实现上通过 PreviewWorker(QThread) 执行生成逻辑,以避免 UI 卡顿。
预览按钮会优先使用“选中项”,若未选中则默认使用第一个 PDF。

六、子线程批量处理

处理逻辑放在 WatermarkWorker(QThread) 中完成:

七、水印生成逻辑

水印生成使用 ReportLab,在内存中创建一页 PDF:

  1. 读取当前页面大小(确保适配不同尺寸 PDF)
  2. 在中心点平移 + 旋转
  3. 按固定网格绘制水印文本
  4. 输出为 BytesIO 内存对象

关键点:

八、字体选择与注册

字体下拉框基于系统字体列表:

字体注册则通过 Windows 注册表查询真实字体文件路径,然后用 ReportLab 注册:

  1. 在注册表中查找字体名称
  2. 拼接字体文件路径
  3. pdfmetrics.registerFont(TTFont(...))

这样既能保证预览字体一致,也能保证 PDF 输出字体可用。

九、输出策略与重名机制

输出统一保存到:

./output/原文件名_watermarked.pdf

如果文件已存在,会自动追加序号:

xxx_watermarked_1.pdf
xxx_watermarked_2.pdf

这一逻辑由 unique_output_path 负责,避免覆盖历史结果。

十、错误处理与弹窗提示

以下情况会触发弹窗警告:

这样可以避免用户在无感知情况下操作失败。

十一、运行方式

直接运行脚本即可启动 GUI:

python d:\测试\公众号水文\11-PDF添加水印\PDF添加水印.py

如需打包为 EXE,可使用 PyInstaller(另行说明)。

十二、总结与扩展建议

当前版本已经具备完整的批量水印能力,并具备以下优势:

后续可考虑的扩展方向:

如需进一步增强功能或增加更多参数,我可以继续扩展优化。

完整代码

import io
import os
import sys
import winreg

from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtGui import QFont, QFontDatabase, QPainter, QPixmap
from PyQt5.QtWidgets import (
    QApplication,
    QFileDialog,
    QGroupBox,
    QHBoxLayout,
    QLabel,
    QLineEdit,
    QListWidget,
    QListWidgetItem,
    QMessageBox,
    QPushButton,
    QProgressBar,
    QSlider,
    QSpinBox,
    QDoubleSpinBox,
    QVBoxLayout,
    QWidget,
    QComboBox,
    QAbstractItemView,
    QSizePolicy,
)
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont

try:
    from PyPDF2 import PdfReader, PdfWriter
except Exception:
    PdfReader = None
    PdfWriter = None


def find_font_file(font_family):
    fonts_dir = os.path.join(os.environ.get("WINDIR", "C:\\Windows"), "Fonts")
    try:
        with winreg.OpenKey(
            winreg.HKEY_LOCAL_MACHINE,
            r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Fonts",
        ) as key:
            index = 0
            while True:
                try:
                    name, data, _ = winreg.EnumValue(key, index)
                    index += 1
                except OSError:
                    break
                if font_family.lower() in name.lower():
                    if os.path.isabs(data):
                        return data
                    return os.path.join(fonts_dir, data)
    except OSError:
        pass
    return None


def unique_output_path(output_dir, base_name):
    output_path = os.path.join(output_dir, base_name)
    if not os.path.exists(output_path):
        return output_path
    name, ext = os.path.splitext(base_name)
    counter = 1
    while True:
        candidate = os.path.join(output_dir, f"{name}_{counter}{ext}")
        if not os.path.exists(candidate):
            return candidate
        counter += 1


def get_reportlab_font(font_family, font_cache):
    if font_family in font_cache:
        return font_cache[font_family]
    font_path = find_font_file(font_family)
    if not font_path or not os.path.exists(font_path):
        raise FileNotFoundError(f"字体文件缺失:{font_family}")
    font_key = f"font_{len(font_cache)}"
    pdfmetrics.registerFont(TTFont(font_key, font_path))
    font_cache[font_family] = font_key
    return font_key


def create_watermark_pdf_buffer(width, height, options, font_cache):
    buffer = io.BytesIO()
    watermark_canvas = canvas.Canvas(buffer, pagesize=(width, height))
    font_name = get_reportlab_font(options["font_family"], font_cache)
    watermark_canvas.setFont(font_name, options["font_size"])
    watermark_canvas.setFillAlpha(options["opacity"] / 100)
    watermark_canvas.translate(width / 2, height / 2)
    watermark_canvas.rotate(options["angle"])
    x_step = width / 5
    y_step = height / 10 * (options["line_spacing"] / 100)
    for i in range(5):
        for j in range(10):
            a = (i - 2) * x_step
            b = (j - 4) * y_step
            watermark_canvas.drawString(a, b, options["text"])
    watermark_canvas.save()
    buffer.seek(0)
    return buffer


class PdfListWidget(QListWidget):
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setAcceptDrops(True)
        self.setSelectionMode(QAbstractItemView.ExtendedSelection)

    def dragEnterEvent(self, event):
        if event.mimeData().hasUrls():
            event.acceptProposedAction()
        else:
            event.ignore()

    def dragMoveEvent(self, event):
        if event.mimeData().hasUrls():
            event.acceptProposedAction()
        else:
            event.ignore()

    def dropEvent(self, event):
        if not event.mimeData().hasUrls():
            return
        for url in event.mimeData().urls():
            path = url.toLocalFile()
            if path.lower().endswith(".pdf"):
                self.add_pdf_item(path)

    def add_pdf_item(self, path):
        for i in range(self.count()):
            if self.item(i).text() == path:
                return
        item = QListWidgetItem(path)
        self.addItem(item)

    def selected_paths(self):
        return [item.text() for item in self.selectedItems()]

    def all_paths(self):
        return [self.item(i).text() for i in range(self.count())]


class WatermarkWorker(QThread):
    progress_changed = pyqtSignal(int)
    status_changed = pyqtSignal(str)
    error_occurred = pyqtSignal(str)
    finished_success = pyqtSignal(str)

    def __init__(self, files, options, parent=None):
        super().__init__(parent)
        self.files = files
        self.options = options
        self._font_cache = {}

    def run(self):
        if PdfReader is None or PdfWriter is None:
            self.error_occurred.emit("缺少 PyPDF2 库,请先安装 PyPDF2。")
            return
        try:
            output_dir = os.path.join(os.getcwd(), "output")
            os.makedirs(output_dir, exist_ok=True)
        except Exception as exc:
            self.error_occurred.emit(f"无法创建输出目录:{exc}")
            return

        total = len(self.files)
        for index, file_path in enumerate(self.files, start=1):
            try:
                self.status_changed.emit(f"处理中:{os.path.basename(file_path)}")
                output_name = f"{os.path.splitext(os.path.basename(file_path))[0]}_watermarked.pdf"
                output_path = unique_output_path(output_dir, output_name)
                reader = PdfReader(file_path)
                writer = PdfWriter()
                for page in reader.pages:
                    width = float(page.mediabox.width)
                    height = float(page.mediabox.height)
                    watermark_pdf = create_watermark_pdf_buffer(
                        width,
                        height,
                        self.options,
                        self._font_cache,
                    )
                    watermark_reader = PdfReader(watermark_pdf)
                    watermark_page = watermark_reader.pages[0]
                    page.merge_page(watermark_page)
                    writer.add_page(page)
                with open(output_path, "wb") as output_file:
                    writer.write(output_file)
                progress = int(index / total * 100)
                self.progress_changed.emit(progress)
            except PermissionError:
                self.error_occurred.emit("文件权限不足,无法读写 PDF。")
                return
            except Exception as exc:
                self.error_occurred.emit(f"处理失败:{exc}")
                return

        self.progress_changed.emit(100)
        self.status_changed.emit("处理完成")
        self.finished_success.emit(output_dir)


class PreviewWorker(QThread):
    status_changed = pyqtSignal(str)
    error_occurred = pyqtSignal(str)
    preview_ready = pyqtSignal(str)

    def __init__(self, file_path, options, parent=None):
        super().__init__(parent)
        self.file_path = file_path
        self.options = options
        self._font_cache = {}

    def run(self):
        if PdfReader is None or PdfWriter is None:
            self.error_occurred.emit("缺少 PyPDF2 库,请先安装 PyPDF2。")
            return
        try:
            output_dir = os.path.join(os.getcwd(), "output")
            os.makedirs(output_dir, exist_ok=True)
        except Exception as exc:
            self.error_occurred.emit(f"无法创建输出目录:{exc}")
            return
        try:
            self.status_changed.emit("生成预览中")
            reader = PdfReader(self.file_path)
            if not reader.pages:
                self.error_occurred.emit("PDF 内容为空,无法预览。")
                return
            writer = PdfWriter()
            page = reader.pages[0]
            width = float(page.mediabox.width)
            height = float(page.mediabox.height)
            watermark_pdf = create_watermark_pdf_buffer(
                width,
                height,
                self.options,
                self._font_cache,
            )
            watermark_reader = PdfReader(watermark_pdf)
            watermark_page = watermark_reader.pages[0]
            page.merge_page(watermark_page)
            writer.add_page(page)
            base_name = os.path.splitext(os.path.basename(self.file_path))[0]
            output_name = f"{base_name}_preview.pdf"
            output_path = unique_output_path(output_dir, output_name)
            with open(output_path, "wb") as output_file:
                writer.write(output_file)
            self.preview_ready.emit(output_path)
        except PermissionError:
            self.error_occurred.emit("文件权限不足,无法读写 PDF。")
        except Exception as exc:
            self.error_occurred.emit(f"预览失败:{exc}")


class WatermarkApp(QWidget):
    def __init__(self):
        super().__init__()
        self.worker = None
        self.preview_worker = None
        self.init_ui()

    def init_ui(self):
        self.setWindowTitle("PDF 水印工具")
        self.resize(1100, 650)

        main_layout = QVBoxLayout(self)
        content_layout = QHBoxLayout()
        main_layout.addLayout(content_layout)

        left_layout = QVBoxLayout()
        content_layout.addLayout(left_layout, 3)

        self.pdf_list = PdfListWidget()
        self.pdf_list.setToolTip("拖拽或通过按钮添加 PDF 文件")
        left_layout.addWidget(self.pdf_list)

        list_button_layout = QHBoxLayout()
        left_layout.addLayout(list_button_layout)

        self.add_button = QPushButton("添加 PDF")
        self.add_button.setToolTip("选择并添加 PDF 文件")
        self.add_button.clicked.connect(self.add_files)
        list_button_layout.addWidget(self.add_button)

        self.remove_button = QPushButton("删除选中")
        self.remove_button.setToolTip("删除列表中选中的 PDF 文件")
        self.remove_button.clicked.connect(self.remove_selected)
        list_button_layout.addWidget(self.remove_button)

        right_layout = QVBoxLayout()
        content_layout.addLayout(right_layout, 4)

        self.text_group = QGroupBox("水印文字")
        self.text_group.setToolTip("设置水印显示的文字内容")
        text_layout = QVBoxLayout()
        self.text_input = QLineEdit("python-小庄办公")
        self.text_input.setToolTip("输入水印文字内容")
        self.text_input.textChanged.connect(self.update_preview)
        text_layout.addWidget(self.text_input)
        self.text_group.setLayout(text_layout)
        right_layout.addWidget(self.text_group)

        self.font_group = QGroupBox("字体选择器")
        self.font_group.setToolTip("选择系统已安装字体")
        font_layout = QVBoxLayout()
        self.font_combo = QComboBox()
        self.font_combo.setToolTip("从系统字体列表中选择字体")
        font_layout.addWidget(self.font_combo)
        self.font_group.setLayout(font_layout)
        right_layout.addWidget(self.font_group)

        self.spacing_group = QGroupBox("水印行间距")
        self.spacing_group.setToolTip("调整水印纵向间距")
        spacing_layout = QHBoxLayout()
        self.spacing_spin = QSpinBox()
        self.spacing_spin.setToolTip("设置水印行间距百分比")
        self.spacing_spin.setRange(0, 200)
        self.spacing_spin.setSingleStep(1)
        self.spacing_spin.setSuffix(" %")
        self.spacing_spin.setValue(100)
        self.spacing_spin.valueChanged.connect(self.update_preview)
        spacing_layout.addWidget(self.spacing_spin)
        self.spacing_group.setLayout(spacing_layout)
        right_layout.addWidget(self.spacing_group)

        self.size_group = QGroupBox("水印字体大小")
        self.size_group.setToolTip("设置水印字体大小")
        size_layout = QHBoxLayout()
        self.size_spin = QSpinBox()
        self.size_spin.setToolTip("设置水印字体大小(pt)")
        self.size_spin.setRange(8, 200)
        self.size_spin.setSingleStep(1)
        self.size_spin.setSuffix(" pt")
        self.size_spin.setValue(36)
        self.size_spin.valueChanged.connect(self.update_preview)
        size_layout.addWidget(self.size_spin)
        self.size_group.setLayout(size_layout)
        right_layout.addWidget(self.size_group)

        self.angle_group = QGroupBox("旋转角度")
        self.angle_group.setToolTip("调整水印旋转角度")
        angle_layout = QHBoxLayout()
        self.angle_slider = QSlider(Qt.Horizontal)
        self.angle_slider.setToolTip("拖动调整旋转角度")
        self.angle_slider.setRange(-90, 90)
        self.angle_slider.setSingleStep(1)
        self.angle_slider.setValue(0)
        self.angle_spin = QSpinBox()
        self.angle_spin.setToolTip("输入旋转角度")
        self.angle_spin.setRange(-90, 90)
        self.angle_spin.setSingleStep(1)
        self.angle_spin.setValue(0)
        angle_layout.addWidget(self.angle_slider)
        angle_layout.addWidget(self.angle_spin)
        self.angle_group.setLayout(angle_layout)
        right_layout.addWidget(self.angle_group)

        self.opacity_group = QGroupBox("透明度")
        self.opacity_group.setToolTip("调整水印透明度")
        opacity_layout = QHBoxLayout()
        self.opacity_slider = QSlider(Qt.Horizontal)
        self.opacity_slider.setToolTip("拖动调整透明度")
        self.opacity_slider.setRange(0, 100)
        self.opacity_slider.setSingleStep(1)
        self.opacity_slider.setValue(30)
        self.opacity_spin = QSpinBox()
        self.opacity_spin.setToolTip("输入透明度百分比")
        self.opacity_spin.setRange(0, 100)
        self.opacity_spin.setSingleStep(1)
        self.opacity_spin.setValue(30)
        opacity_layout.addWidget(self.opacity_slider)
        opacity_layout.addWidget(self.opacity_spin)
        self.opacity_group.setLayout(opacity_layout)
        right_layout.addWidget(self.opacity_group)

        self.preview_group = QGroupBox("预览")
        self.preview_group.setToolTip("实时预览水印效果")
        preview_layout = QVBoxLayout()
        self.preview_label = QLabel()
        self.preview_label.setToolTip("显示水印旋转与透明度预览")
        self.preview_label.setMinimumHeight(140)
        self.preview_label.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
        self.preview_label.setAlignment(Qt.AlignCenter)
        preview_layout.addWidget(self.preview_label)
        self.preview_group.setLayout(preview_layout)
        right_layout.addWidget(self.preview_group)

        right_layout.addStretch()

        bottom_layout = QHBoxLayout()
        main_layout.addLayout(bottom_layout)

        self.start_button = QPushButton("开始处理")
        self.start_button.setToolTip("开始给列表中的 PDF 添加水印")
        self.start_button.clicked.connect(self.start_processing)
        bottom_layout.addWidget(self.start_button)

        self.preview_button = QPushButton("预览效果")
        self.preview_button.setToolTip("生成当前参数下的水印预览文件")
        self.preview_button.clicked.connect(self.start_preview)
        bottom_layout.addWidget(self.preview_button)

        self.open_output_button = QPushButton("打开输出目录")
        self.open_output_button.setToolTip("打开输出文件夹")
        self.open_output_button.clicked.connect(self.open_output_dir)
        bottom_layout.addWidget(self.open_output_button)

        self.progress_bar = QProgressBar()
        self.progress_bar.setToolTip("显示处理进度")
        self.progress_bar.setValue(0)
        bottom_layout.addWidget(self.progress_bar, 3)

        self.status_label = QLabel("就绪")
        self.status_label.setToolTip("显示当前处理状态")
        bottom_layout.addWidget(self.status_label, 2)

        self.load_fonts()
        self.bind_signals()
        self.update_preview()

    def load_fonts(self):
        db = QFontDatabase()
        families = db.families()
        self.font_combo.addItems(families)
        default_font = "宋体"
        if default_font in families:
            self.font_combo.setCurrentText(default_font)
        elif families:
            self.font_combo.setCurrentIndex(0)
        self.font_combo.currentTextChanged.connect(self.update_preview)

    def bind_signals(self):
        self.angle_slider.valueChanged.connect(self.angle_spin.setValue)
        self.angle_spin.valueChanged.connect(self.angle_slider.setValue)
        self.angle_slider.valueChanged.connect(self.update_preview)
        self.angle_spin.valueChanged.connect(self.update_preview)

        self.opacity_slider.valueChanged.connect(self.opacity_spin.setValue)
        self.opacity_spin.valueChanged.connect(self.opacity_slider.setValue)
        self.opacity_slider.valueChanged.connect(self.update_preview)
        self.opacity_spin.valueChanged.connect(self.update_preview)

    def update_preview(self):
        width = max(self.preview_label.width(), 300)
        height = max(self.preview_label.height(), 140)
        pixmap = QPixmap(width, height)
        pixmap.fill(Qt.white)
        painter = QPainter(pixmap)
        painter.setRenderHint(QPainter.Antialiasing)
        font = QFont(self.font_combo.currentText(), self.size_spin.value())
        painter.setFont(font)
        painter.setOpacity(self.opacity_spin.value() / 100)
        painter.translate(width / 2, height / 2)
        painter.rotate(self.angle_spin.value())
        painter.drawText(-width // 4, 0, self.text_input.text())
        painter.end()
        self.preview_label.setPixmap(pixmap)

    def add_files(self):
        files, _ = QFileDialog.getOpenFileNames(
            self,
            "选择 PDF 文件",
            "",
            "PDF 文件 (*.pdf)",
        )
        for file_path in files:
            self.pdf_list.add_pdf_item(file_path)

    def remove_selected(self):
        for item in self.pdf_list.selectedItems():
            row = self.pdf_list.row(item)
            self.pdf_list.takeItem(row)

    def start_processing(self):
        files = self.pdf_list.all_paths()
        if not files:
            QMessageBox.warning(self, "提示", "请先添加需要处理的 PDF 文件。")
            return
        if PdfReader is None or PdfWriter is None:
            QMessageBox.warning(self, "提示", "未安装 PyPDF2,无法处理 PDF。")
            return
        font_path = find_font_file(self.font_combo.currentText())
        if not font_path or not os.path.exists(font_path):
            QMessageBox.warning(self, "提示", "所选字体文件缺失,请更换字体。")
            return

        options = {
            "text": self.text_input.text().strip() or " ",
            "font_family": self.font_combo.currentText(),
            "line_spacing": self.spacing_spin.value(),
            "font_size": self.size_spin.value(),
            "angle": self.angle_spin.value(),
            "opacity": self.opacity_spin.value(),
        }

        self.progress_bar.setValue(0)
        self.status_label.setText("准备处理")
        self.start_button.setEnabled(False)
        self.preview_button.setEnabled(False)
        self.worker = WatermarkWorker(files, options)
        self.worker.progress_changed.connect(self.progress_bar.setValue)
        self.worker.status_changed.connect(self.status_label.setText)
        self.worker.error_occurred.connect(self.handle_error)
        self.worker.finished_success.connect(self.handle_success)
        self.worker.start()

    def handle_error(self, message):
        self.start_button.setEnabled(True)
        self.preview_button.setEnabled(True)
        self.status_label.setText("处理失败")
        QMessageBox.warning(self, "错误", message)

    def handle_success(self, output_dir):
        self.start_button.setEnabled(True)
        self.preview_button.setEnabled(True)
        self.status_label.setText("处理完成")
        QMessageBox.information(self, "完成", f"水印处理完成,输出目录:{output_dir}")

    def start_preview(self):
        files = self.pdf_list.selected_paths() or self.pdf_list.all_paths()
        if not files:
            QMessageBox.warning(self, "提示", "请先添加需要预览的 PDF 文件。")
            return
        if PdfReader is None or PdfWriter is None:
            QMessageBox.warning(self, "提示", "未安装 PyPDF2,无法预览。")
            return
        font_path = find_font_file(self.font_combo.currentText())
        if not font_path or not os.path.exists(font_path):
            QMessageBox.warning(self, "提示", "所选字体文件缺失,请更换字体。")
            return

        options = {
            "text": self.text_input.text().strip() or " ",
            "font_family": self.font_combo.currentText(),
            "line_spacing": self.spacing_spin.value(),
            "font_size": self.size_spin.value(),
            "angle": self.angle_spin.value(),
            "opacity": self.opacity_spin.value(),
        }

        self.status_label.setText("生成预览")
        self.preview_button.setEnabled(False)
        self.start_button.setEnabled(False)
        self.preview_worker = PreviewWorker(files[0], options)
        self.preview_worker.status_changed.connect(self.status_label.setText)
        self.preview_worker.error_occurred.connect(self.handle_error)
        self.preview_worker.preview_ready.connect(self.handle_preview_ready)
        self.preview_worker.start()

    def handle_preview_ready(self, preview_path):
        self.start_button.setEnabled(True)
        self.preview_button.setEnabled(True)
        self.status_label.setText("预览完成")
        try:
            os.startfile(preview_path)
        except Exception as exc:
            QMessageBox.warning(self, "提示", f"无法打开预览文件:{exc}")

    def open_output_dir(self):
        output_dir = os.path.join(os.getcwd(), "output")
        if not os.path.exists(output_dir):
            QMessageBox.warning(self, "提示", "输出目录不存在,请先处理文件。")
            return
        try:
            os.startfile(output_dir)
        except Exception as exc:
            QMessageBox.warning(self, "提示", f"无法打开输出目录:{exc}")


def main():
    app = QApplication(sys.argv)
    window = WatermarkApp()
    window.show()
    sys.exit(app.exec_())


if __name__ == "__main__":
    main()


以上就是Python+PyQt5实现PDF批量水印工具的详细内容,更多关于Python PDF批量水印的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文