Python+PyQt5实现PDF批量水印工具
作者:小庄-Python办公
PyQt5 的 PDF 批量水印工具:
本文围绕 PDF添加水印.py,完整讲解如何将单文件水印脚本重构为可交互的 PyQt5 桌面应用,支持拖拽导入、实时预览、子线程处理与输出管理,并给出关键实现细节与结构说明。

一、功能概览
这款工具面向“批量给 PDF 添加文字水印”的场景,核心功能如下:
- 支持拖拽或选择多个 PDF 文件
- 参数可配置:字体、字号、行间距、旋转角度、透明度
- 实时预览旋转与透明度效果
- 使用子线程处理,避免界面卡顿
- 输出目录统一为
./output/并自动重命名 - 常见错误(空列表、字体缺失、权限不足)弹窗提示
二、整体架构与模块拆分
应用主要分为三层:
UI 层(WatermarkApp)
负责界面布局、参数采集、按钮交互、预览触发、状态展示等。
处理层(WatermarkWorker / PreviewWorker)
使用 QThread 执行批量水印与预览生成,避免阻塞主线程。
工具层(字体解析 / 输出重名 / 水印生成)
封装公共逻辑,确保主流程清晰且复用性强。
三、界面布局设计
界面布局遵循“左列表、右参数、底部操作区”的结构。
1. 左侧:PDF 文件列表 + 添加/删除
- 列表控件支持拖拽(继承
QListWidget重写拖拽事件) - 添加按钮打开文件对话框
- 删除按钮移除选中项
2. 右侧:参数面板(按顺序分组)
- 水印文字:
QLineEdit - 字体选择:
QComboBox(系统字体列表) - 行间距:
QSpinBox(0–200%) - 字号:
QSpinBox(8–200pt) - 角度:
QSlider + QSpinBox(-90–90°) - 透明度:
QSlider + QSpinBox(0–100%) - 预览:
QLabel显示动态渲染图
3. 底部:操作按钮 + 进度/状态
- “开始处理”
- “预览效果”
- “打开输出目录”
- 进度条 + 状态提示文本
四、拖拽导入实现
自定义 PdfListWidget,重写拖拽相关事件:
dragEnterEvent/dragMoveEvent:校验 URLdropEvent:过滤.pdf文件并添加到列表
这样用户可以直接把 PDF 拖进列表,极大提升效率。
五、实时预览机制
预览的核心思想是:
- 使用当前 UI 参数生成一个水印 PDF
- 将水印合成到原 PDF 的第一页
- 保存为临时预览文件并自动打开
实现上通过 PreviewWorker(QThread) 执行生成逻辑,以避免 UI 卡顿。
预览按钮会优先使用“选中项”,若未选中则默认使用第一个 PDF。
六、子线程批量处理
处理逻辑放在 WatermarkWorker(QThread) 中完成:
- 读取每个 PDF
- 对每一页合成水印
- 保存到
./output目录 - 通过信号更新进度条与状态文本
七、水印生成逻辑
水印生成使用 ReportLab,在内存中创建一页 PDF:
- 读取当前页面大小(确保适配不同尺寸 PDF)
- 在中心点平移 + 旋转
- 按固定网格绘制水印文本
- 输出为
BytesIO内存对象
关键点:
- 行间距与透明度全部来自 UI 参数
- 字体通过系统字体注册,缺失时抛错
八、字体选择与注册
字体下拉框基于系统字体列表:
QFontDatabase().families()获取可用字体- 默认选“宋体”
字体注册则通过 Windows 注册表查询真实字体文件路径,然后用 ReportLab 注册:
- 在注册表中查找字体名称
- 拼接字体文件路径
pdfmetrics.registerFont(TTFont(...))
这样既能保证预览字体一致,也能保证 PDF 输出字体可用。
九、输出策略与重名机制
输出统一保存到:
./output/原文件名_watermarked.pdf
如果文件已存在,会自动追加序号:
xxx_watermarked_1.pdf xxx_watermarked_2.pdf
这一逻辑由 unique_output_path 负责,避免覆盖历史结果。
十、错误处理与弹窗提示
以下情况会触发弹窗警告:
- PDF 列表为空
- 字体文件缺失
- 没有读写权限
- 缺少 PyPDF2 依赖
这样可以避免用户在无感知情况下操作失败。
十一、运行方式
直接运行脚本即可启动 GUI:
python d:\测试\公众号水文\11-PDF添加水印\PDF添加水印.py
如需打包为 EXE,可使用 PyInstaller(另行说明)。
十二、总结与扩展建议
当前版本已经具备完整的批量水印能力,并具备以下优势:
- 参数全可配置
- UI 友好
- 处理不卡顿
- 输出安全可靠
后续可考虑的扩展方向:
- 增加颜色选择
- 自定义横向/纵向密度
- 多行预览与缩略图预览
- 输出目录可自定义
如需进一步增强功能或增加更多参数,我可以继续扩展优化。
完整代码
import io
import os
import sys
import winreg
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtGui import QFont, QFontDatabase, QPainter, QPixmap
from PyQt5.QtWidgets import (
QApplication,
QFileDialog,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QListWidgetItem,
QMessageBox,
QPushButton,
QProgressBar,
QSlider,
QSpinBox,
QDoubleSpinBox,
QVBoxLayout,
QWidget,
QComboBox,
QAbstractItemView,
QSizePolicy,
)
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
try:
from PyPDF2 import PdfReader, PdfWriter
except Exception:
PdfReader = None
PdfWriter = None
def find_font_file(font_family):
fonts_dir = os.path.join(os.environ.get("WINDIR", "C:\\Windows"), "Fonts")
try:
with winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Fonts",
) as key:
index = 0
while True:
try:
name, data, _ = winreg.EnumValue(key, index)
index += 1
except OSError:
break
if font_family.lower() in name.lower():
if os.path.isabs(data):
return data
return os.path.join(fonts_dir, data)
except OSError:
pass
return None
def unique_output_path(output_dir, base_name):
output_path = os.path.join(output_dir, base_name)
if not os.path.exists(output_path):
return output_path
name, ext = os.path.splitext(base_name)
counter = 1
while True:
candidate = os.path.join(output_dir, f"{name}_{counter}{ext}")
if not os.path.exists(candidate):
return candidate
counter += 1
def get_reportlab_font(font_family, font_cache):
if font_family in font_cache:
return font_cache[font_family]
font_path = find_font_file(font_family)
if not font_path or not os.path.exists(font_path):
raise FileNotFoundError(f"字体文件缺失:{font_family}")
font_key = f"font_{len(font_cache)}"
pdfmetrics.registerFont(TTFont(font_key, font_path))
font_cache[font_family] = font_key
return font_key
def create_watermark_pdf_buffer(width, height, options, font_cache):
buffer = io.BytesIO()
watermark_canvas = canvas.Canvas(buffer, pagesize=(width, height))
font_name = get_reportlab_font(options["font_family"], font_cache)
watermark_canvas.setFont(font_name, options["font_size"])
watermark_canvas.setFillAlpha(options["opacity"] / 100)
watermark_canvas.translate(width / 2, height / 2)
watermark_canvas.rotate(options["angle"])
x_step = width / 5
y_step = height / 10 * (options["line_spacing"] / 100)
for i in range(5):
for j in range(10):
a = (i - 2) * x_step
b = (j - 4) * y_step
watermark_canvas.drawString(a, b, options["text"])
watermark_canvas.save()
buffer.seek(0)
return buffer
class PdfListWidget(QListWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setAcceptDrops(True)
self.setSelectionMode(QAbstractItemView.ExtendedSelection)
def dragEnterEvent(self, event):
if event.mimeData().hasUrls():
event.acceptProposedAction()
else:
event.ignore()
def dragMoveEvent(self, event):
if event.mimeData().hasUrls():
event.acceptProposedAction()
else:
event.ignore()
def dropEvent(self, event):
if not event.mimeData().hasUrls():
return
for url in event.mimeData().urls():
path = url.toLocalFile()
if path.lower().endswith(".pdf"):
self.add_pdf_item(path)
def add_pdf_item(self, path):
for i in range(self.count()):
if self.item(i).text() == path:
return
item = QListWidgetItem(path)
self.addItem(item)
def selected_paths(self):
return [item.text() for item in self.selectedItems()]
def all_paths(self):
return [self.item(i).text() for i in range(self.count())]
class WatermarkWorker(QThread):
progress_changed = pyqtSignal(int)
status_changed = pyqtSignal(str)
error_occurred = pyqtSignal(str)
finished_success = pyqtSignal(str)
def __init__(self, files, options, parent=None):
super().__init__(parent)
self.files = files
self.options = options
self._font_cache = {}
def run(self):
if PdfReader is None or PdfWriter is None:
self.error_occurred.emit("缺少 PyPDF2 库,请先安装 PyPDF2。")
return
try:
output_dir = os.path.join(os.getcwd(), "output")
os.makedirs(output_dir, exist_ok=True)
except Exception as exc:
self.error_occurred.emit(f"无法创建输出目录:{exc}")
return
total = len(self.files)
for index, file_path in enumerate(self.files, start=1):
try:
self.status_changed.emit(f"处理中:{os.path.basename(file_path)}")
output_name = f"{os.path.splitext(os.path.basename(file_path))[0]}_watermarked.pdf"
output_path = unique_output_path(output_dir, output_name)
reader = PdfReader(file_path)
writer = PdfWriter()
for page in reader.pages:
width = float(page.mediabox.width)
height = float(page.mediabox.height)
watermark_pdf = create_watermark_pdf_buffer(
width,
height,
self.options,
self._font_cache,
)
watermark_reader = PdfReader(watermark_pdf)
watermark_page = watermark_reader.pages[0]
page.merge_page(watermark_page)
writer.add_page(page)
with open(output_path, "wb") as output_file:
writer.write(output_file)
progress = int(index / total * 100)
self.progress_changed.emit(progress)
except PermissionError:
self.error_occurred.emit("文件权限不足,无法读写 PDF。")
return
except Exception as exc:
self.error_occurred.emit(f"处理失败:{exc}")
return
self.progress_changed.emit(100)
self.status_changed.emit("处理完成")
self.finished_success.emit(output_dir)
class PreviewWorker(QThread):
status_changed = pyqtSignal(str)
error_occurred = pyqtSignal(str)
preview_ready = pyqtSignal(str)
def __init__(self, file_path, options, parent=None):
super().__init__(parent)
self.file_path = file_path
self.options = options
self._font_cache = {}
def run(self):
if PdfReader is None or PdfWriter is None:
self.error_occurred.emit("缺少 PyPDF2 库,请先安装 PyPDF2。")
return
try:
output_dir = os.path.join(os.getcwd(), "output")
os.makedirs(output_dir, exist_ok=True)
except Exception as exc:
self.error_occurred.emit(f"无法创建输出目录:{exc}")
return
try:
self.status_changed.emit("生成预览中")
reader = PdfReader(self.file_path)
if not reader.pages:
self.error_occurred.emit("PDF 内容为空,无法预览。")
return
writer = PdfWriter()
page = reader.pages[0]
width = float(page.mediabox.width)
height = float(page.mediabox.height)
watermark_pdf = create_watermark_pdf_buffer(
width,
height,
self.options,
self._font_cache,
)
watermark_reader = PdfReader(watermark_pdf)
watermark_page = watermark_reader.pages[0]
page.merge_page(watermark_page)
writer.add_page(page)
base_name = os.path.splitext(os.path.basename(self.file_path))[0]
output_name = f"{base_name}_preview.pdf"
output_path = unique_output_path(output_dir, output_name)
with open(output_path, "wb") as output_file:
writer.write(output_file)
self.preview_ready.emit(output_path)
except PermissionError:
self.error_occurred.emit("文件权限不足,无法读写 PDF。")
except Exception as exc:
self.error_occurred.emit(f"预览失败:{exc}")
class WatermarkApp(QWidget):
def __init__(self):
super().__init__()
self.worker = None
self.preview_worker = None
self.init_ui()
def init_ui(self):
self.setWindowTitle("PDF 水印工具")
self.resize(1100, 650)
main_layout = QVBoxLayout(self)
content_layout = QHBoxLayout()
main_layout.addLayout(content_layout)
left_layout = QVBoxLayout()
content_layout.addLayout(left_layout, 3)
self.pdf_list = PdfListWidget()
self.pdf_list.setToolTip("拖拽或通过按钮添加 PDF 文件")
left_layout.addWidget(self.pdf_list)
list_button_layout = QHBoxLayout()
left_layout.addLayout(list_button_layout)
self.add_button = QPushButton("添加 PDF")
self.add_button.setToolTip("选择并添加 PDF 文件")
self.add_button.clicked.connect(self.add_files)
list_button_layout.addWidget(self.add_button)
self.remove_button = QPushButton("删除选中")
self.remove_button.setToolTip("删除列表中选中的 PDF 文件")
self.remove_button.clicked.connect(self.remove_selected)
list_button_layout.addWidget(self.remove_button)
right_layout = QVBoxLayout()
content_layout.addLayout(right_layout, 4)
self.text_group = QGroupBox("水印文字")
self.text_group.setToolTip("设置水印显示的文字内容")
text_layout = QVBoxLayout()
self.text_input = QLineEdit("python-小庄办公")
self.text_input.setToolTip("输入水印文字内容")
self.text_input.textChanged.connect(self.update_preview)
text_layout.addWidget(self.text_input)
self.text_group.setLayout(text_layout)
right_layout.addWidget(self.text_group)
self.font_group = QGroupBox("字体选择器")
self.font_group.setToolTip("选择系统已安装字体")
font_layout = QVBoxLayout()
self.font_combo = QComboBox()
self.font_combo.setToolTip("从系统字体列表中选择字体")
font_layout.addWidget(self.font_combo)
self.font_group.setLayout(font_layout)
right_layout.addWidget(self.font_group)
self.spacing_group = QGroupBox("水印行间距")
self.spacing_group.setToolTip("调整水印纵向间距")
spacing_layout = QHBoxLayout()
self.spacing_spin = QSpinBox()
self.spacing_spin.setToolTip("设置水印行间距百分比")
self.spacing_spin.setRange(0, 200)
self.spacing_spin.setSingleStep(1)
self.spacing_spin.setSuffix(" %")
self.spacing_spin.setValue(100)
self.spacing_spin.valueChanged.connect(self.update_preview)
spacing_layout.addWidget(self.spacing_spin)
self.spacing_group.setLayout(spacing_layout)
right_layout.addWidget(self.spacing_group)
self.size_group = QGroupBox("水印字体大小")
self.size_group.setToolTip("设置水印字体大小")
size_layout = QHBoxLayout()
self.size_spin = QSpinBox()
self.size_spin.setToolTip("设置水印字体大小(pt)")
self.size_spin.setRange(8, 200)
self.size_spin.setSingleStep(1)
self.size_spin.setSuffix(" pt")
self.size_spin.setValue(36)
self.size_spin.valueChanged.connect(self.update_preview)
size_layout.addWidget(self.size_spin)
self.size_group.setLayout(size_layout)
right_layout.addWidget(self.size_group)
self.angle_group = QGroupBox("旋转角度")
self.angle_group.setToolTip("调整水印旋转角度")
angle_layout = QHBoxLayout()
self.angle_slider = QSlider(Qt.Horizontal)
self.angle_slider.setToolTip("拖动调整旋转角度")
self.angle_slider.setRange(-90, 90)
self.angle_slider.setSingleStep(1)
self.angle_slider.setValue(0)
self.angle_spin = QSpinBox()
self.angle_spin.setToolTip("输入旋转角度")
self.angle_spin.setRange(-90, 90)
self.angle_spin.setSingleStep(1)
self.angle_spin.setValue(0)
angle_layout.addWidget(self.angle_slider)
angle_layout.addWidget(self.angle_spin)
self.angle_group.setLayout(angle_layout)
right_layout.addWidget(self.angle_group)
self.opacity_group = QGroupBox("透明度")
self.opacity_group.setToolTip("调整水印透明度")
opacity_layout = QHBoxLayout()
self.opacity_slider = QSlider(Qt.Horizontal)
self.opacity_slider.setToolTip("拖动调整透明度")
self.opacity_slider.setRange(0, 100)
self.opacity_slider.setSingleStep(1)
self.opacity_slider.setValue(30)
self.opacity_spin = QSpinBox()
self.opacity_spin.setToolTip("输入透明度百分比")
self.opacity_spin.setRange(0, 100)
self.opacity_spin.setSingleStep(1)
self.opacity_spin.setValue(30)
opacity_layout.addWidget(self.opacity_slider)
opacity_layout.addWidget(self.opacity_spin)
self.opacity_group.setLayout(opacity_layout)
right_layout.addWidget(self.opacity_group)
self.preview_group = QGroupBox("预览")
self.preview_group.setToolTip("实时预览水印效果")
preview_layout = QVBoxLayout()
self.preview_label = QLabel()
self.preview_label.setToolTip("显示水印旋转与透明度预览")
self.preview_label.setMinimumHeight(140)
self.preview_label.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.preview_label.setAlignment(Qt.AlignCenter)
preview_layout.addWidget(self.preview_label)
self.preview_group.setLayout(preview_layout)
right_layout.addWidget(self.preview_group)
right_layout.addStretch()
bottom_layout = QHBoxLayout()
main_layout.addLayout(bottom_layout)
self.start_button = QPushButton("开始处理")
self.start_button.setToolTip("开始给列表中的 PDF 添加水印")
self.start_button.clicked.connect(self.start_processing)
bottom_layout.addWidget(self.start_button)
self.preview_button = QPushButton("预览效果")
self.preview_button.setToolTip("生成当前参数下的水印预览文件")
self.preview_button.clicked.connect(self.start_preview)
bottom_layout.addWidget(self.preview_button)
self.open_output_button = QPushButton("打开输出目录")
self.open_output_button.setToolTip("打开输出文件夹")
self.open_output_button.clicked.connect(self.open_output_dir)
bottom_layout.addWidget(self.open_output_button)
self.progress_bar = QProgressBar()
self.progress_bar.setToolTip("显示处理进度")
self.progress_bar.setValue(0)
bottom_layout.addWidget(self.progress_bar, 3)
self.status_label = QLabel("就绪")
self.status_label.setToolTip("显示当前处理状态")
bottom_layout.addWidget(self.status_label, 2)
self.load_fonts()
self.bind_signals()
self.update_preview()
def load_fonts(self):
db = QFontDatabase()
families = db.families()
self.font_combo.addItems(families)
default_font = "宋体"
if default_font in families:
self.font_combo.setCurrentText(default_font)
elif families:
self.font_combo.setCurrentIndex(0)
self.font_combo.currentTextChanged.connect(self.update_preview)
def bind_signals(self):
self.angle_slider.valueChanged.connect(self.angle_spin.setValue)
self.angle_spin.valueChanged.connect(self.angle_slider.setValue)
self.angle_slider.valueChanged.connect(self.update_preview)
self.angle_spin.valueChanged.connect(self.update_preview)
self.opacity_slider.valueChanged.connect(self.opacity_spin.setValue)
self.opacity_spin.valueChanged.connect(self.opacity_slider.setValue)
self.opacity_slider.valueChanged.connect(self.update_preview)
self.opacity_spin.valueChanged.connect(self.update_preview)
def update_preview(self):
width = max(self.preview_label.width(), 300)
height = max(self.preview_label.height(), 140)
pixmap = QPixmap(width, height)
pixmap.fill(Qt.white)
painter = QPainter(pixmap)
painter.setRenderHint(QPainter.Antialiasing)
font = QFont(self.font_combo.currentText(), self.size_spin.value())
painter.setFont(font)
painter.setOpacity(self.opacity_spin.value() / 100)
painter.translate(width / 2, height / 2)
painter.rotate(self.angle_spin.value())
painter.drawText(-width // 4, 0, self.text_input.text())
painter.end()
self.preview_label.setPixmap(pixmap)
def add_files(self):
files, _ = QFileDialog.getOpenFileNames(
self,
"选择 PDF 文件",
"",
"PDF 文件 (*.pdf)",
)
for file_path in files:
self.pdf_list.add_pdf_item(file_path)
def remove_selected(self):
for item in self.pdf_list.selectedItems():
row = self.pdf_list.row(item)
self.pdf_list.takeItem(row)
def start_processing(self):
files = self.pdf_list.all_paths()
if not files:
QMessageBox.warning(self, "提示", "请先添加需要处理的 PDF 文件。")
return
if PdfReader is None or PdfWriter is None:
QMessageBox.warning(self, "提示", "未安装 PyPDF2,无法处理 PDF。")
return
font_path = find_font_file(self.font_combo.currentText())
if not font_path or not os.path.exists(font_path):
QMessageBox.warning(self, "提示", "所选字体文件缺失,请更换字体。")
return
options = {
"text": self.text_input.text().strip() or " ",
"font_family": self.font_combo.currentText(),
"line_spacing": self.spacing_spin.value(),
"font_size": self.size_spin.value(),
"angle": self.angle_spin.value(),
"opacity": self.opacity_spin.value(),
}
self.progress_bar.setValue(0)
self.status_label.setText("准备处理")
self.start_button.setEnabled(False)
self.preview_button.setEnabled(False)
self.worker = WatermarkWorker(files, options)
self.worker.progress_changed.connect(self.progress_bar.setValue)
self.worker.status_changed.connect(self.status_label.setText)
self.worker.error_occurred.connect(self.handle_error)
self.worker.finished_success.connect(self.handle_success)
self.worker.start()
def handle_error(self, message):
self.start_button.setEnabled(True)
self.preview_button.setEnabled(True)
self.status_label.setText("处理失败")
QMessageBox.warning(self, "错误", message)
def handle_success(self, output_dir):
self.start_button.setEnabled(True)
self.preview_button.setEnabled(True)
self.status_label.setText("处理完成")
QMessageBox.information(self, "完成", f"水印处理完成,输出目录:{output_dir}")
def start_preview(self):
files = self.pdf_list.selected_paths() or self.pdf_list.all_paths()
if not files:
QMessageBox.warning(self, "提示", "请先添加需要预览的 PDF 文件。")
return
if PdfReader is None or PdfWriter is None:
QMessageBox.warning(self, "提示", "未安装 PyPDF2,无法预览。")
return
font_path = find_font_file(self.font_combo.currentText())
if not font_path or not os.path.exists(font_path):
QMessageBox.warning(self, "提示", "所选字体文件缺失,请更换字体。")
return
options = {
"text": self.text_input.text().strip() or " ",
"font_family": self.font_combo.currentText(),
"line_spacing": self.spacing_spin.value(),
"font_size": self.size_spin.value(),
"angle": self.angle_spin.value(),
"opacity": self.opacity_spin.value(),
}
self.status_label.setText("生成预览")
self.preview_button.setEnabled(False)
self.start_button.setEnabled(False)
self.preview_worker = PreviewWorker(files[0], options)
self.preview_worker.status_changed.connect(self.status_label.setText)
self.preview_worker.error_occurred.connect(self.handle_error)
self.preview_worker.preview_ready.connect(self.handle_preview_ready)
self.preview_worker.start()
def handle_preview_ready(self, preview_path):
self.start_button.setEnabled(True)
self.preview_button.setEnabled(True)
self.status_label.setText("预览完成")
try:
os.startfile(preview_path)
except Exception as exc:
QMessageBox.warning(self, "提示", f"无法打开预览文件:{exc}")
def open_output_dir(self):
output_dir = os.path.join(os.getcwd(), "output")
if not os.path.exists(output_dir):
QMessageBox.warning(self, "提示", "输出目录不存在,请先处理文件。")
return
try:
os.startfile(output_dir)
except Exception as exc:
QMessageBox.warning(self, "提示", f"无法打开输出目录:{exc}")
def main():
app = QApplication(sys.argv)
window = WatermarkApp()
window.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()
以上就是Python+PyQt5实现PDF批量水印工具的详细内容,更多关于Python PDF批量水印的资料请关注脚本之家其它相关文章!
