python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python发送邮件

Python实现批量发送邮件脚本

作者:起风了___

这篇文章主要为大家详细介绍了如何使用Python实现批量发送邮件脚本,可以使用指定的邮件模版批量发送邮件,感兴趣的小伙伴可以了解下

简介

闲来无事,帮朋友写个邮件发送脚本,顺便拿来写篇技术文,一举两得。脚本从指定的 excel 文件读取客户姓名、邮箱等信息,使用指定的邮件模版批量发送邮件。由于朋友使用文本模版就能达成要求,所以脚本虽然支持 Html 邮件模版,但并未测试。有自定义样式的朋友还需自行调试代码。 备注:本人测试是使用 163 邮箱

依赖

配置信息

邮箱配置

class EmailConfig:
    """
    邮箱配置
    """
    # 你的邮箱
    user: str = '138xxxx8888@163.com'
    # 邮箱授权码
    password: str = 'TXYuclse932VzHJ'
    # 邮箱服务器
    host: str = 'smtp.163.com'
    # 端口
    port: int = '587'

发件人信息

class SenderInfo:
    """
    发件人信息
    """
    # 显示给客户的发件人名称
    name: str = '起风了-dev'
    # 职位
    position: str = '开发人员'
    # 公司
    company: str = 'xxx科技有限责任公司'
    # 邮箱
    email: str = '138xxxx8888@163.com'
    # 签名
    signature: str = '签名测试'

    def get_subject(self, client: dict):
        """
        获取邮件主题
        :param client: 客户信息, 与客户信息文件内一致
        :return: 邮件主题
        """
        return f"{self.name}致{client['name']}的邮件"

    def to_json(self):
        return {
            'name': self.name,
            'position': self.position,
            'company': self.company,
            'email': self.email,
            'signature': self.signature
        }

发送设置

class SendSettings:
    """
    发送设置
    """
    # 邮件间隔(秒),避免被限制
    delay_between_emails: int = 2
    # 测试模式(只发给自己)
    test_mode: bool = False
    # 每批最多发送量
    max_emails_per_batch: int = 20

加载客户数据

excel 文件格式

indexnameemail
1起风了xxxx@qq.com

加载客户数据时会检验客户姓名和邮箱是否存在

返回数据结构: [{index: 1,name:'起风了', email: 'xxxx@qq.com'},{index: 2, name:'', email: ''}]

    def load_clients(self) -> list[dict]:
        """加载客户数据"""
        df = pd.read_excel(self.client_data_file)
        client_list = df.to_dict(orient='records')
        if not client_list:
            raise ValueError("客户数据为空")
        # 客户数据校验并清洗
        for client in client_list:
            if not client['email']:
                raise ValueError(f"客户 {json.dumps(client, ensure_ascii=False)} 的邮箱为空")
            if not client['name']:
                raise ValueError(f"客户 {json.dumps(client, ensure_ascii=False)} 的姓名为空")
            client['name'] = client['name'].strip()
            client['email'] = client['email'].strip()
        print(f"已加载 {len(client_list)} 个客户数据")
        return client_list

加载模版数据

模版从文件内加载

    def load_template(self):
        """加载邮件模板"""
        template = None
        if self.template_file:
            with open(self.template_file, 'r', encoding='utf-8') as f:
                template = Template(f.read())
        txt_template = None
        if self.txt_template_file:
            with open(self.txt_template_file, 'r', encoding='utf-8') as f:
                txt_template = f.read()
        return template, txt_template

转换 HTML 到纯文本

如果只指定了 HTML 模版,会使用这个函数转换一份纯文本邮件出来,HTML 模版本人并未测试,有相应需求的朋友可自行调试代码

    def _convert_html_to_text(self, html):
        """
        智能转换HTML到纯文本
        保留重要格式(列表、段落、强调等)
        """
        # 自定义转换规则
        replacements = [
            (r'<br\s*/?>', '\n'),
            (r'<p.*?>', '\n\n'),
            (r'</p>', ''),
            (r'<li.*?>', '• '),
            (r'</li>', '\n'),
            (r'<strong.*?>', '*'),
            (r'</strong>', '*'),
            (r'<em.*?>', '_'),
            (r'</em>', '_'),
            (r'<h[1-6].*?>', '\n\n'),
            (r'</h[1-6]>', '\n\n'),
            (r'<[^>]+>', ''),  # 移除其他所有HTML标签
        ]

        text = html
        for pattern, replacement in replacements:
            text = re.sub(pattern, replacement, text)

        # 清理多余的空行
        text = re.sub(r'\n\s*\n\s*\n', '\n\n', text)
        print(f'_convert_html_to_text:{text}')
        return text.strip()

邮件内容组织

在这个函数中将模版中的占位符替换为指定数据

Html 模版内容替换方式请移步学习 jinja2 库

    def personalize_content(self, client, template, txt_template: str):
        """个性化邮件内容"""

        html_content = template.render(client=client, sender=self.sender_info.to_json()) if template else None

        if txt_template:
            txt_content = txt_template
            for key in client.keys():
                txt_content = txt_content.replace('{client_' + key + '}', str(client[key]))
            sender_json = self.sender_info.to_json()
            for key in sender_json:
                txt_content = txt_content.replace('{sender_' + key + '}', str(sender_json[key]))
        else:
            txt_content = self._convert_html_to_text(html_content)

        return html_content, txt_content

发送邮件

这部分是邮件发送的核心代码,邮件主题从 SenderInfo 配置中获取,实现统一配置。 发送成功或失败都要记录日志,已备后续查看、核对

    def send_to_client(self, client, content):
        """发送给单个客户"""
        try:
            # 构建邮件
            subject = self.sender_info.get_subject(client=client)

            # 如果是测试模式,发给自己
            to_email = client['email']
            if self.send_settings.test_mode:
                to_email = self.email_config.user
                subject = f"[测试] {subject}"

            # 发送
            self.yag.send(
                to=to_email,
                subject=subject,
                contents=content,
                # attachments=['attachments/sample.pdf'],  # 可选附件
                headers={
                    # 'X-Priority': '1',  #  高优先级 ⚠️(显示感叹号)
                    'X-Priority': '3',  # 普通优先级(默认,不显示特殊标记)
                    # 'X-Priority': '5',  # 低优先级
                    'X-Mailer': 'PersonalEmailSender'
                }
            )

            # 记录日志
            log_entry = {
                'client': client['name'],
                'email': client['email'],
                'time': datetime.now().isoformat(),
                'status': '成功'
            }
            self.sent_log.append(log_entry)

            print(f"✓ 已发送给 {client['name']} <{client['email']}>")
            return True

        except Exception as e:
            print(f"✗ 发送失败 {client['name']}: {e}")
            self.sent_log.append({
                'client': client['name'],
                'email': client['email'],
                'time': datetime.now().isoformat(),
                'status': f'失败: {str(e)}'
            })
            return False

存储日志

运行完毕将日志写入文件

    def save_log(self):
        """保存发送记录"""
        if not self.sent_log:
            return

        log_df = pd.DataFrame(self.sent_log)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        log_file = f'sent_logs/sent_{timestamp}.csv'

        # 确保目录存在
        os.makedirs('sent_logs', exist_ok=True)

        log_df.to_csv(log_file, index=False, encoding='utf-8-sig')
        print(f"\n📋 发送记录已保存: {log_file}")

功能组合后的主运行函数

在此处整合以上介绍的各部分功能。

流程:加载客户 --> 加载模版 --> 分别为客户生成邮件内容 --> 内容校验 --> 发送邮件 --> 发送延迟 --> 保存日志 --> 发送结束

    def run(self):
        """主运行函数"""
        print("开始发送个性化邮件...\n")

        # 加载数据
        client_list = self.load_clients()
        template, txt_template = self.load_template()

        sent_count = 0
        total_clients = len(client_list)

        for i, client in enumerate(client_list, 1):
            print(f"\n[{i}/{total_clients}] 处理: {client['name']}")

            # 个性化内容
            html_content, text_content = self.personalize_content(client, template, txt_template)

            if not text_content:
                print(f"✗ 无内容可发送给 {client['name']}")
                # 记录日志
                log_entry = {
                    'client': client['name'],
                    'email': client['email'],
                    'time': datetime.now().isoformat(),
                    'status': '失败',
                    '原因': '无内容'
                }
                self.sent_log.append(log_entry)
                continue

            content = [text_content, html_content] if html_content else text_content
            # 发送
            if self.send_to_client(client, content):
                sent_count += 1

            # 延迟,避免被限制
            if i < total_clients:
                delay = self.send_settings.delay_between_emails
                print(f"等待 {delay} 秒...")
                time.sleep(delay)

            # 分批暂停(如果发送量大)
            if i % self.send_settings.max_emails_per_batch == 0:
                print(f"\n已发送 {i} 封,暂停 60 秒...")
                time.sleep(60)

        # 保存日志
        self.save_log()

        print(f"\n✅ 完成!成功发送 {sent_count}/{total_clients} 封邮件")

        # 关闭连接
        if self.yag:
            self.yag.close()

运行示例

if __name__ == "__main__":
    # 客户数据文件
    client_data_file = './clients/clients.xlsx'
    # 邮件模板文件
    txt_template_file = './templates/christmas.txt'
    sender = EmailSender(client_data_file=client_data_file,
                         txt_template_file=txt_template_file)
    sender.run()

附源码

# 配置信息
class EmailConfig:
    """
    邮箱配置
    """
    # 你的邮箱
    user: str = '138xxxx8888@163.com'
    # 邮箱授权码
    password: str = 'TXYuclse932VzHJ'
    # 邮箱服务器
    host: str = 'smtp.163.com'
    # 端口
    port: int = '587'
class SenderInfo:
    """
    发件人信息
    """
    # 显示给客户的发件人名称
    name: str = '起风了-dev'
    # 职位
    position: str = '开发人员'
    # 公司
    company: str = 'xxx科技有限责任公司'
    # 邮箱
    email: str = '138xxxx8888@163.com'
    # 签名
    signature: str = '签名测试'

    def get_subject(self, client: dict):
        """
        获取邮件主题
        :param client: 客户信息, 与客户信息文件内一致
        :return: 邮件主题
        """
        return f"{self.name}致{client['name']}的邮件"

    def to_json(self):
        return {
            'name': self.name,
            'position': self.position,
            'company': self.company,
            'email': self.email,
            'signature': self.signature
        }
class SendSettings:
    """
    发送设置
    """
    # 邮件间隔(秒),避免被限制
    delay_between_emails: int = 2
    # 测试模式(只发给自己)
    test_mode: bool = False
    # 每批最多发送量
    max_emails_per_batch: int = 20

# 主功能代码
import json
import re

import yagmail
import pandas as pd
import time
import os
from datetime import datetime
from jinja2 import Template
from config.config import EmailConfig, SenderInfo, SendSettings


class EmailSender:
    """邮件发送器"""

    def __init__(self, client_data_file: str, txt_template_file: str | None):
        """初始化"""
        # 客户数据
        self.client_data_file = client_data_file
        # 邮件模板
        self.template_file = None
        self.txt_template_file = txt_template_file
        # 邮件配置
        self.email_config = EmailConfig()
        self.sender_info = SenderInfo()
        self.send_settings = SendSettings()
        self.yag = yagmail.SMTP(
            user=self.email_config.user,
            password=self.email_config.password,
            host=self.email_config.host,
            port=self.email_config.port
        )
        self.sent_log = []

    def load_clients(self) -> list[dict]:
        """加载客户数据"""
        df = pd.read_excel(self.client_data_file)
        client_list = df.to_dict(orient='records')
        if not client_list:
            raise ValueError("客户数据为空")
        # 客户数据校验并清洗
        for client in client_list:
            if not client['email']:
                raise ValueError(f"客户 {json.dumps(client, ensure_ascii=False)} 的邮箱为空")
            if not client['name']:
                raise ValueError(f"客户 {json.dumps(client, ensure_ascii=False)} 的姓名为空")
            client['name'] = client['name'].strip()
            client['email'] = client['email'].strip()
        print(f"已加载 {len(client_list)} 个客户数据")
        return client_list

    def load_template(self):
        """加载邮件模板"""
        template = None
        if self.template_file:
            with open(self.template_file, 'r', encoding='utf-8') as f:
                template = Template(f.read())
        txt_template = None
        if self.txt_template_file:
            with open(self.txt_template_file, 'r', encoding='utf-8') as f:
                txt_template = f.read()
        return template, txt_template

    def _convert_html_to_text(self, html):
        """
        智能转换HTML到纯文本
        保留重要格式(列表、段落、强调等)
        """
        # 自定义转换规则
        replacements = [
            (r'<br\s*/?>', '\n'),
            (r'<p.*?>', '\n\n'),
            (r'</p>', ''),
            (r'<li.*?>', '• '),
            (r'</li>', '\n'),
            (r'<strong.*?>', '*'),
            (r'</strong>', '*'),
            (r'<em.*?>', '_'),
            (r'</em>', '_'),
            (r'<h[1-6].*?>', '\n\n'),
            (r'</h[1-6]>', '\n\n'),
            (r'<[^>]+>', ''),  # 移除其他所有HTML标签
        ]

        text = html
        for pattern, replacement in replacements:
            text = re.sub(pattern, replacement, text)

        # 清理多余的空行
        text = re.sub(r'\n\s*\n\s*\n', '\n\n', text)
        print(f'_convert_html_to_text:{text}')
        return text.strip()

    def personalize_content(self, client, template, txt_template: str):
        """个性化邮件内容"""

        html_content = template.render(client=client, sender=self.sender_info.to_json()) if template else None

        if txt_template:
            txt_content = txt_template
            for key in client.keys():
                txt_content = txt_content.replace('{client_' + key + '}', str(client[key]))
            sender_json = self.sender_info.to_json()
            for key in sender_json:
                txt_content = txt_content.replace('{sender_' + key + '}', str(sender_json[key]))
        else:
            txt_content = self._convert_html_to_text(html_content)

        return html_content, txt_content

    def send_to_client(self, client, content):
        """发送给单个客户"""
        try:
            # 构建邮件
            subject = self.sender_info.get_subject(client=client)

            # 如果是测试模式,发给自己
            to_email = client['email']
            if self.send_settings.test_mode:
                to_email = self.email_config.user
                subject = f"[测试] {subject}"

            # 发送
            self.yag.send(
                to=to_email,
                subject=subject,
                contents=content,
                # attachments=['attachments/sample.pdf'],  # 可选附件
                headers={
                    # 'X-Priority': '1',  #  高优先级 ⚠️(显示感叹号)
                    'X-Priority': '3',  # 普通优先级(默认,不显示特殊标记)
                    # 'X-Priority': '5',  # 低优先级
                    'X-Mailer': 'PersonalEmailSender'
                }
            )

            # 记录日志
            log_entry = {
                'client': client['name'],
                'email': client['email'],
                'time': datetime.now().isoformat(),
                'status': '成功'
            }
            self.sent_log.append(log_entry)

            print(f"✓ 已发送给 {client['name']} <{client['email']}>")
            return True

        except Exception as e:
            print(f"✗ 发送失败 {client['name']}: {e}")
            self.sent_log.append({
                'client': client['name'],
                'email': client['email'],
                'time': datetime.now().isoformat(),
                'status': f'失败: {str(e)}'
            })
            return False

    def save_log(self):
        """保存发送记录"""
        if not self.sent_log:
            return

        log_df = pd.DataFrame(self.sent_log)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        log_file = f'sent_logs/sent_{timestamp}.csv'

        # 确保目录存在
        os.makedirs('sent_logs', exist_ok=True)

        log_df.to_csv(log_file, index=False, encoding='utf-8-sig')
        print(f"\n📋 发送记录已保存: {log_file}")

    def run(self):
        """主运行函数"""
        print("开始发送个性化邮件...\n")

        # 加载数据
        client_list = self.load_clients()
        template, txt_template = self.load_template()

        sent_count = 0
        total_clients = len(client_list)

        for i, client in enumerate(client_list, 1):
            print(f"\n[{i}/{total_clients}] 处理: {client['name']}")

            # 个性化内容
            html_content, text_content = self.personalize_content(client, template, txt_template)

            if not text_content:
                print(f"✗ 无内容可发送给 {client['name']}")
                # 记录日志
                log_entry = {
                    'client': client['name'],
                    'email': client['email'],
                    'time': datetime.now().isoformat(),
                    'status': '失败',
                    '原因': '无内容'
                }
                self.sent_log.append(log_entry)
                continue

            content = [text_content, html_content] if html_content else text_content
            # 发送
            if self.send_to_client(client, content):
                sent_count += 1

            # 延迟,避免被限制
            if i < total_clients:
                delay = self.send_settings.delay_between_emails
                print(f"等待 {delay} 秒...")
                time.sleep(delay)

            # 分批暂停(如果发送量大)
            if i % self.send_settings.max_emails_per_batch == 0:
                print(f"\n已发送 {i} 封,暂停 60 秒...")
                time.sleep(60)

        # 保存日志
        self.save_log()

        print(f"\n✅ 完成!成功发送 {sent_count}/{total_clients} 封邮件")

        # 关闭连接
        if self.yag:
            self.yag.close()


if __name__ == "__main__":
    # 客户数据文件
    client_data_file = './clients/clients.xlsx'
    # 邮件模板文件
    txt_template_file = './templates/christmas.txt'
    sender = EmailSender(client_data_file=client_data_file,
                         txt_template_file=txt_template_file)
    sender.run()

到此这篇关于Python实现批量发送邮件脚本的文章就介绍到这了,更多相关Python发送邮件内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文