python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python监控网站变化

Python实现监控网站变化并自动通知

作者:闲人编程

在当今快速变化的数字世界中,网站内容的实时监控变得愈发重要,本文将使用Python实现监控网站变化并在发生变化时自动通知你,感兴趣的可以了解下

1. 引言

在当今快速变化的数字世界中,网站内容的实时监控变得愈发重要。无论是竞争对手的价格调整、新闻网站的突发报道、产品库存状态,还是个人关注的网页更新,及时获取这些变化信息都能为我们带来显著的竞争优势和便利。

根据研究,企业如果能提前30分钟获知竞争对手的价格变化,就可以调整自己的定价策略,从而提升5-15% 的销售额。而对于个人用户来说,自动化监控可以节省大量手动刷新网页的时间,让重要信息主动找到你。

网站监控的应用场景

2. 技术方案概述

2.1 核心监控策略

网站变化监控主要采用以下几种技术策略:

2.2 系统架构设计

我们的监控系统将包含以下核心组件:

3. 环境准备和基础库

3.1 安装必要的Python库

# 网页请求和解析
pip install requests
pip install beautifulsoup4
pip install selenium
pip install lxml

# 浏览器自动化(用于JavaScript渲染的页面)
pip install webdriver-manager

# 数据处理和存储
pip install pandas
pip install sqlalchemy

# 通知服务
pip install smtplib
pip install requests # 用于webhook

# 任务调度
pip install schedule
pip install APScheduler

# 其他工具
pip install hashlib
pip install difflib
pip install pillow # 用于截图比较

3.2 核心库功能介绍

# 导入所需库
import requests
from bs4 import BeautifulSoup
import hashlib
import time
import smtplib
import schedule
import json
import sqlite3
from datetime import datetime
import logging
from typing import Dict, List, Optional, Tuple
import difflib
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
import os
from pathlib import Path

4. 基础网页监控器实现

核心监控引擎

让我们从构建基础监控器开始:

class WebsiteMonitor:
    """
    网站变化监控器 - 核心监控引擎
    提供网页内容监控、变化检测和通知功能
    """
    
    def __init__(self, config_file: str = "monitor_config.json"):
        """
        初始化网站监控器
        
        Args:
            config_file: 配置文件路径
        """
        self.config_file = Path(config_file)
        self.config = self.load_config()
        self.setup_logging()
        self.setup_database()
        
    def setup_logging(self):
        """设置日志记录"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('website_monitor.log', encoding='utf-8'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def load_config(self) -> Dict:
        """
        加载监控配置
        
        Returns:
            Dict: 配置信息
        """
        default_config = {
            "monitoring_interval": 300,  # 默认5分钟
            "notification_methods": {
                "email": {
                    "enabled": False,
                    "smtp_server": "smtp.gmail.com",
                    "smtp_port": 587,
                    "username": "your_email@gmail.com",
                    "password": "your_password",
                    "recipient": "recipient@example.com"
                },
                "webhook": {
                    "enabled": False,
                    "url": "https://your-webhook-url.com"
                }
            },
            "websites": {},
            "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        }
        
        try:
            if self.config_file.exists():
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    loaded_config = json.load(f)
                    # 合并配置,确保包含所有必要字段
                    return self.merge_configs(default_config, loaded_config)
            else:
                # 创建默认配置文件
                self.save_config(default_config)
                return default_config
        except Exception as e:
            self.logger.error(f"加载配置文件失败: {str(e)}")
            return default_config
    
    def merge_configs(self, default: Dict, user: Dict) -> Dict:
        """
        合并默认配置和用户配置
        
        Args:
            default: 默认配置
            user: 用户配置
            
        Returns:
            Dict: 合并后的配置
        """
        result = default.copy()
        
        def deep_merge(default_dict, user_dict):
            for key, value in user_dict.items():
                if key in default_dict and isinstance(default_dict[key], dict) and isinstance(value, dict):
                    deep_merge(default_dict[key], value)
                else:
                    default_dict[key] = value
        
        deep_merge(result, user)
        return result
    
    def save_config(self, config: Dict = None):
        """
        保存配置到文件
        
        Args:
            config: 要保存的配置,如果为None则保存当前配置
        """
        if config is None:
            config = self.config
        
        try:
            with open(self.config_file, 'w', encoding='utf-8') as f:
                json.dump(config, f, indent=2, ensure_ascii=False)
            self.logger.info("配置已保存")
        except Exception as e:
            self.logger.error(f"保存配置失败: {str(e)}")
    
    def setup_database(self):
        """设置SQLite数据库"""
        try:
            self.db_path = Path("website_monitor.db")
            self.conn = sqlite3.connect(self.db_path)
            self.create_tables()
            self.logger.info("数据库初始化完成")
        except Exception as e:
            self.logger.error(f"数据库初始化失败: {str(e)}")
    
    def create_tables(self):
        """创建数据库表"""
        cursor = self.conn.cursor()
        
        # 创建网站监控记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS website_changes (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                website_url TEXT NOT NULL,
                change_type TEXT NOT NULL,
                change_description TEXT,
                previous_content_hash TEXT,
                current_content_hash TEXT,
                change_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                notified BOOLEAN DEFAULT 0
            )
        ''')
        
        # 创建内容快照表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS content_snapshots (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                website_url TEXT NOT NULL,
                content_hash TEXT NOT NULL,
                content_text TEXT,
                snapshot_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 创建监控配置表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS monitor_configs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                website_url TEXT UNIQUE NOT NULL,
                check_interval INTEGER DEFAULT 300,
                css_selector TEXT,
                enabled BOOLEAN DEFAULT 1,
                last_checked DATETIME
            )
        ''')
        
        self.conn.commit()
    
    def add_website(self, url: str, check_interval: int = 300, 
                   css_selector: str = None, enabled: bool = True) -> bool:
        """
        添加要监控的网站
        
        Args:
            url: 网站URL
            check_interval: 检查间隔(秒)
            css_selector: CSS选择器(用于监控特定部分)
            enabled: 是否启用监控
            
        Returns:
            bool: 添加是否成功
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT OR REPLACE INTO monitor_configs 
                (website_url, check_interval, css_selector, enabled, last_checked)
                VALUES (?, ?, ?, ?, ?)
            ''', (url, check_interval, css_selector, enabled, None))
            
            self.conn.commit()
            
            # 更新内存中的配置
            if 'websites' not in self.config:
                self.config['websites'] = {}
            
            self.config['websites'][url] = {
                'check_interval': check_interval,
                'css_selector': css_selector,
                'enabled': enabled
            }
            
            self.save_config()
            self.logger.info(f"已添加监控网站: {url}")
            return True
            
        except Exception as e:
            self.logger.error(f"添加网站失败 {url}: {str(e)}")
            return False
    
    def remove_website(self, url: str) -> bool:
        """
        移除监控网站
        
        Args:
            url: 网站URL
            
        Returns:
            bool: 移除是否成功
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('DELETE FROM monitor_configs WHERE website_url = ?', (url,))
            self.conn.commit()
            
            # 更新内存中的配置
            if url in self.config.get('websites', {}):
                del self.config['websites'][url]
                self.save_config()
            
            self.logger.info(f"已移除监控网站: {url}")
            return True
            
        except Exception as e:
            self.logger.error(f"移除网站失败 {url}: {str(e)}")
            return False
    
    def fetch_web_content(self, url: str, css_selector: str = None) -> Tuple[Optional[str], Optional[str]]:
        """
        获取网页内容
        
        Args:
            url: 网页URL
            css_selector: CSS选择器(用于提取特定部分)
            
        Returns:
            Tuple: (原始内容, 提取的内容)
        """
        try:
            headers = {
                'User-Agent': self.config.get('user_agent', 
                    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
            }
            
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()
            
            raw_content = response.text
            
            # 如果指定了CSS选择器,提取特定部分
            extracted_content = None
            if css_selector:
                soup = BeautifulSoup(raw_content, 'html.parser')
                selected_elements = soup.select(css_selector)
                if selected_elements:
                    extracted_content = '\n'.join([elem.get_text(strip=True) for elem in selected_elements])
            
            self.logger.debug(f"成功获取网页内容: {url}")
            return raw_content, extracted_content
            
        except requests.RequestException as e:
            self.logger.error(f"获取网页内容失败 {url}: {str(e)}")
            return None, None
        except Exception as e:
            self.logger.error(f"解析网页内容失败 {url}: {str(e)}")
            return None, None
    
    def calculate_content_hash(self, content: str) -> str:
        """
        计算内容哈希值
        
        Args:
            content: 文本内容
            
        Returns:
            str: 内容的MD5哈希值
        """
        return hashlib.md5(content.encode('utf-8')).hexdigest()
    
    def save_content_snapshot(self, url: str, content_hash: str, content_text: str = None):
        """
        保存内容快照到数据库
        
        Args:
            url: 网站URL
            content_hash: 内容哈希值
            content_text: 内容文本(可选)
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT INTO content_snapshots (website_url, content_hash, content_text)
                VALUES (?, ?, ?)
            ''', (url, content_hash, content_text))
            self.conn.commit()
        except Exception as e:
            self.logger.error(f"保存内容快照失败: {str(e)}")
    
    def get_previous_content_hash(self, url: str) -> Optional[str]:
        """
        获取上一次的内容哈希值
        
        Args:
            url: 网站URL
            
        Returns:
            str: 上一次的内容哈希值,如果没有记录则返回None
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                SELECT content_hash FROM content_snapshots 
                WHERE website_url = ? 
                ORDER BY snapshot_timestamp DESC 
                LIMIT 1
            ''', (url,))
            
            result = cursor.fetchone()
            return result[0] if result else None
            
        except Exception as e:
            self.logger.error(f"获取历史哈希值失败 {url}: {str(e)}")
            return None
    
    def detect_changes(self, url: str, current_content: str, 
                      previous_hash: str = None) -> Tuple[bool, Optional[str], Optional[str]]:
        """
        检测内容变化
        
        Args:
            url: 网站URL
            current_content: 当前内容
            previous_hash: 上一次的内容哈希值
            
        Returns:
            Tuple: (是否变化, 当前哈希值, 变化描述)
        """
        if not current_content:
            return False, None, "无法获取当前内容"
        
        current_hash = self.calculate_content_hash(current_content)
        
        # 如果没有历史记录,保存初始快照并返回无变化
        if previous_hash is None:
            self.save_content_snapshot(url, current_hash, current_content)
            return False, current_hash, "首次监控,建立基准"
        
        # 比较哈希值
        if current_hash == previous_hash:
            return False, current_hash, "内容未变化"
        
        # 检测到变化,获取详细差异
        previous_content = self.get_previous_content_text(url)
        change_description = self.analyze_content_changes(previous_content, current_content)
        
        # 保存新快照
        self.save_content_snapshot(url, current_hash, current_content)
        
        return True, current_hash, change_description
    
    def get_previous_content_text(self, url: str) -> Optional[str]:
        """
        获取上一次的内容文本
        
        Args:
            url: 网站URL
            
        Returns:
            str: 上一次的内容文本
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                SELECT content_text FROM content_snapshots 
                WHERE website_url = ? 
                ORDER BY snapshot_timestamp DESC 
                LIMIT 1
            ''', (url,))
            
            result = cursor.fetchone()
            return result[0] if result else None
            
        except Exception as e:
            self.logger.error(f"获取历史内容失败 {url}: {str(e)}")
            return None
    
    def analyze_content_changes(self, old_content: str, new_content: str) -> str:
        """
        分析内容变化详情
        
        Args:
            old_content: 旧内容
            new_content: 新内容
            
        Returns:
            str: 变化描述
        """
        if not old_content or not new_content:
            return "无法比较内容变化"
        
        # 使用difflib进行文本比较
        diff = difflib.unified_diff(
            old_content.splitlines(keepends=True),
            new_content.splitlines(keepends=True),
            fromfile='旧内容',
            tofile='新内容',
            n=3
        )
        
        diff_text = ''.join(diff)
        
        if diff_text:
            # 简化diff输出,只显示前几行
            lines = diff_text.split('\n')[:10]
            return "检测到内容变化:\n" + '\n'.join(lines)
        else:
            return "内容发生变化但无法生成差异报告"
    
    def check_website(self, url: str) -> Dict:
        """
        检查单个网站的变化
        
        Args:
            url: 网站URL
            
        Returns:
            Dict: 检查结果
        """
        self.logger.info(f"检查网站: {url}")
        
        # 获取网站配置
        website_config = self.config['websites'].get(url, {})
        css_selector = website_config.get('css_selector')
        
        # 获取网页内容
        raw_content, extracted_content = self.fetch_web_content(url, css_selector)
        
        # 使用提取的内容(如果有),否则使用原始内容
        content_to_check = extracted_content if extracted_content else raw_content
        
        if not content_to_check:
            return {
                'url': url,
                'changed': False,
                'error': '无法获取网页内容',
                'timestamp': datetime.now()
            }
        
        # 获取上一次的内容哈希
        previous_hash = self.get_previous_content_hash(url)
        
        # 检测变化
        changed, current_hash, change_description = self.detect_changes(
            url, content_to_check, previous_hash
        )
        
        result = {
            'url': url,
            'changed': changed,
            'current_hash': current_hash,
            'previous_hash': previous_hash,
            'change_description': change_description,
            'timestamp': datetime.now()
        }
        
        if changed:
            self.logger.info(f"检测到变化: {url}")
            # 记录变化到数据库
            self.record_change(url, change_description, previous_hash, current_hash)
            # 发送通知
            self.send_notification(url, change_description)
        
        return result
    
    def record_change(self, url: str, change_description: str, 
                     previous_hash: str, current_hash: str):
        """
        记录变化到数据库
        
        Args:
            url: 网站URL
            change_description: 变化描述
            previous_hash: 之前的哈希值
            current_hash: 当前哈希值
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT INTO website_changes 
                (website_url, change_type, change_description, previous_content_hash, current_content_hash)
                VALUES (?, ?, ?, ?, ?)
            ''', (url, 'content_change', change_description, previous_hash, current_hash))
            self.conn.commit()
        except Exception as e:
            self.logger.error(f"记录变化失败: {str(e)}")
    
    def send_notification(self, url: str, change_description: str):
        """
        发送变化通知
        
        Args:
            url: 网站URL
            change_description: 变化描述
        """
        notification_methods = self.config.get('notification_methods', {})
        
        # 邮件通知
        if notification_methods.get('email', {}).get('enabled', False):
            self.send_email_notification(url, change_description)
        
        # Webhook通知
        if notification_methods.get('webhook', {}).get('enabled', False):
            self.send_webhook_notification(url, change_description)
    
    def send_email_notification(self, url: str, change_description: str):
        """
        发送邮件通知
        
        Args:
            url: 网站URL
            change_description: 变化描述
        """
        try:
            email_config = self.config['notification_methods']['email']
            
            subject = f"网站变化通知: {url}"
            body = f"""
检测到网站内容发生变化:

网站: {url}
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

变化详情:
{change_description}

---
此通知由网站监控系统自动发送
            """
            
            # 发送邮件(这里需要实现具体的邮件发送逻辑)
            self._send_email(
                email_config['smtp_server'],
                email_config['smtp_port'],
                email_config['username'],
                email_config['password'],
                email_config['recipient'],
                subject,
                body
            )
            
            self.logger.info(f"已发送邮件通知: {url}")
            
        except Exception as e:
            self.logger.error(f"发送邮件通知失败: {str(e)}")
    
    def _send_email(self, smtp_server: str, port: int, username: str, 
                   password: str, recipient: str, subject: str, body: str):
        """
        发送邮件(具体实现)
        
        Args:
            smtp_server: SMTP服务器
            port: 端口
            username: 发件人邮箱
            password: 密码
            recipient: 收件人邮箱
            subject: 邮件主题
            body: 邮件正文
        """
        # 这里实现具体的邮件发送逻辑
        # 可以使用smtplib和email库
        pass
    
    def send_webhook_notification(self, url: str, change_description: str):
        """
        发送Webhook通知
        
        Args:
            url: 网站URL
            change_description: 变化描述
        """
        try:
            webhook_config = self.config['notification_methods']['webhook']
            webhook_url = webhook_config['url']
            
            payload = {
                'url': url,
                'change_description': change_description,
                'timestamp': datetime.now().isoformat(),
                'type': 'website_change'
            }
            
            response = requests.post(webhook_url, json=payload, timeout=10)
            response.raise_for_status()
            
            self.logger.info(f"已发送Webhook通知: {url}")
            
        except Exception as e:
            self.logger.error(f"发送Webhook通知失败: {str(e)}")
    
    def check_all_websites(self) -> List[Dict]:
        """
        检查所有监控的网站
        
        Returns:
            List: 所有网站的检查结果
        """
        results = []
        websites = self.config.get('websites', {})
        
        for url, config in websites.items():
            if config.get('enabled', True):
                try:
                    result = self.check_website(url)
                    results.append(result)
                    
                    # 避免请求过于频繁
                    time.sleep(1)
                    
                except Exception as e:
                    self.logger.error(f"检查网站失败 {url}: {str(e)}")
                    results.append({
                        'url': url,
                        'changed': False,
                        'error': str(e),
                        'timestamp': datetime.now()
                    })
        
        return results
    
    def start_monitoring(self):
        """开始定时监控"""
        self.logger.info("启动网站监控服务")
        
        interval = self.config.get('monitoring_interval', 300)
        
        # 使用schedule库设置定时任务
        schedule.every(interval).seconds.do(self.check_all_websites)
        
        # 立即执行一次检查
        self.check_all_websites()
        
        # 保持程序运行
        try:
            while True:
                schedule.run_pending()
                time.sleep(1)
        except KeyboardInterrupt:
            self.logger.info("监控服务已停止")
        finally:
            self.cleanup()
    
    def cleanup(self):
        """清理资源"""
        if hasattr(self, 'conn'):
            self.conn.close()
        self.logger.info("资源清理完成")

# 使用示例
def demo_basic_monitor():
    """演示基础监控功能"""
    monitor = WebsiteMonitor()
    
    # 添加要监控的网站
    print("=== 添加监控网站 ===")
    monitor.add_website(
        "https://httpbin.org/json",
        check_interval=300,  # 5分钟
        css_selector=None    # 监控整个页面
    )
    
    monitor.add_website(
        "https://example.com",
        check_interval=600,  # 10分钟
        css_selector="h1"    # 只监控h1标签
    )
    
    # 执行一次检查
    print("\n=== 执行网站检查 ===")
    results = monitor.check_all_websites()
    
    for result in results:
        status = "变化" if result['changed'] else "无变化"
        print(f"网站: {result['url']} - {status}")
        if result.get('error'):
            print(f"  错误: {result['error']}")
        if result.get('change_description'):
            print(f"  变化: {result['change_description'][:100]}...")
    
    # 显示监控统计
    print("\n=== 监控统计 ===")
    print(f"监控网站数量: {len(monitor.config.get('websites', {}))}")
    
    return monitor

if __name__ == "__main__":
    demo_basic_monitor()

5. 高级监控功能

5.1 支持JavaScript渲染的页面监控

有些网站使用JavaScript动态加载内容,我们需要使用Selenium来模拟真实浏览器:

class AdvancedWebsiteMonitor(WebsiteMonitor):
    """
    高级网站监控器 - 支持JavaScript渲染和更复杂的监控策略
    """
    
    def __init__(self, config_file: str = "monitor_config.json"):
        """初始化高级监控器"""
        super().__init__(config_file)
        self.driver = None
    
    def setup_selenium_driver(self, headless: bool = True):
        """
        设置Selenium WebDriver
        
        Args:
            headless: 是否使用无头模式
        """
        try:
            chrome_options = Options()
            if headless:
                chrome_options.add_argument("--headless")
            chrome_options.add_argument("--no-sandbox")
            chrome_options.add_argument("--disable-dev-shm-usage")
            chrome_options.add_argument("--disable-gpu")
            chrome_options.add_argument("--window-size=1920,1080")
            
            self.driver = webdriver.Chrome(
                service=Service(ChromeDriverManager().install()),
                options=chrome_options
            )
            self.logger.info("Selenium WebDriver初始化完成")
        except Exception as e:
            self.logger.error(f"Selenium WebDriver初始化失败: {str(e)}")
    
    def fetch_web_content_selenium(self, url: str, css_selector: str = None, 
                                 wait_time: int = 5) -> Tuple[Optional[str], Optional[str]]:
        """
        使用Selenium获取网页内容(支持JavaScript)
        
        Args:
            url: 网页URL
            css_selector: CSS选择器
            wait_time: 等待页面加载的时间(秒)
            
        Returns:
            Tuple: (原始内容, 提取的内容)
        """
        if not self.driver:
            self.setup_selenium_driver()
        
        try:
            self.driver.get(url)
            time.sleep(wait_time)  # 等待JavaScript执行
            
            raw_content = self.driver.page_source
            
            # 提取特定部分
            extracted_content = None
            if css_selector:
                try:
                    elements = self.driver.find_elements(By.CSS_SELECTOR, css_selector)
                    extracted_content = '\n'.join([element.text for element in elements])
                except Exception as e:
                    self.logger.warning(f"提取元素失败 {url}: {str(e)}")
            
            return raw_content, extracted_content
            
        except Exception as e:
            self.logger.error(f"Selenium获取网页失败 {url}: {str(e)}")
            return None, None
    
    def take_screenshot(self, url: str, screenshot_path: str = None) -> Optional[str]:
        """
        截取网页截图
        
        Args:
            url: 网页URL
            screenshot_path: 截图保存路径
            
        Returns:
            str: 截图文件路径
        """
        if not self.driver:
            self.setup_selenium_driver()
        
        try:
            self.driver.get(url)
            time.sleep(3)  # 等待页面加载
            
            if screenshot_path is None:
                screenshot_dir = Path("screenshots")
                screenshot_dir.mkdir(exist_ok=True)
                filename = f"screenshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
                screenshot_path = screenshot_dir / filename
            
            self.driver.save_screenshot(str(screenshot_path))
            self.logger.info(f"截图已保存: {screenshot_path}")
            return str(screenshot_path)
            
        except Exception as e:
            self.logger.error(f"截图失败 {url}: {str(e)}")
            return None
    
    def compare_screenshots(self, old_screenshot: str, new_screenshot: str) -> float:
        """
        比较两张截图的相似度
        
        Args:
            old_screenshot: 旧截图路径
            new_screenshot: 新截图路径
            
        Returns:
            float: 相似度(0-1)
        """
        try:
            from PIL import Image
            import imagehash
            
            # 计算图片哈希
            old_hash = imagehash.average_hash(Image.open(old_screenshot))
            new_hash = imagehash.average_hash(Image.open(new_screenshot))
            
            # 计算相似度
            similarity = 1 - (old_hash - new_hash) / len(old_hash.hash) ** 2
            return similarity
            
        except Exception as e:
            self.logger.error(f"比较截图失败: {str(e)}")
            return 0.0
    
    def monitor_visual_changes(self, url: str, similarity_threshold: float = 0.95) -> bool:
        """
        监控视觉变化
        
        Args:
            url: 网页URL
            similarity_threshold: 相似度阈值
            
        Returns:
            bool: 是否检测到显著变化
        """
        # 获取当前截图
        current_screenshot = self.take_screenshot(url)
        if not current_screenshot:
            return False
        
        # 获取上一次的截图
        previous_screenshot = self.get_previous_screenshot(url)
        
        if not previous_screenshot:
            # 第一次监控,保存基准截图
            self.save_screenshot_reference(url, current_screenshot)
            return False
        
        # 比较截图
        similarity = self.compare_screenshots(previous_screenshot, current_screenshot)
        
        if similarity < similarity_threshold:
            self.logger.info(f"检测到视觉变化: {url}, 相似度: {similarity:.3f}")
            # 保存变化记录
            self.record_visual_change(url, similarity, current_screenshot)
            return True
        
        return False
    
    def get_previous_screenshot(self, url: str) -> Optional[str]:
        """
        获取上一次的截图路径
        
        Args:
            url: 网站URL
            
        Returns:
            str: 截图路径
        """
        # 这里需要实现从数据库或文件系统获取上一次的截图
        # 简化实现:在screenshots目录中查找最新的截图
        screenshot_dir = Path("screenshots")
        if not screenshot_dir.exists():
            return None
        
        # 查找该网站的最新截图
        pattern = f"screenshot_*_{url.replace('://', '_').replace('/', '_')}.png"
        screenshots = list(screenshot_dir.glob(pattern))
        
        if screenshots:
            return str(max(screenshots, key=os.path.getctime))
        
        return None
    
    def save_screenshot_reference(self, url: str, screenshot_path: str):
        """
        保存截图基准
        
        Args:
            url: 网站URL
            screenshot_path: 截图路径
        """
        # 重命名文件以包含URL信息
        new_filename = f"reference_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{url.replace('://', '_').replace('/', '_')}.png"
        new_path = Path("screenshots") / new_filename
        
        try:
            os.rename(screenshot_path, new_path)
            self.logger.info(f"截图基准已保存: {new_path}")
        except Exception as e:
            self.logger.error(f"保存截图基准失败: {str(e)}")
    
    def record_visual_change(self, url: str, similarity: float, screenshot_path: str):
        """
        记录视觉变化
        
        Args:
            url: 网站URL
            similarity: 相似度
            screenshot_path: 截图路径
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT INTO website_changes 
                (website_url, change_type, change_description, previous_content_hash, current_content_hash)
                VALUES (?, ?, ?, ?, ?)
            ''', (url, 'visual_change', f'视觉变化检测,相似度: {similarity:.3f}', None, None))
            self.conn.commit()
            
            # 发送通知
            self.send_notification(url, f"检测到视觉变化,相似度: {similarity:.3f}")
            
        except Exception as e:
            self.logger.error(f"记录视觉变化失败: {str(e)}")
    
    def cleanup(self):
        """清理资源"""
        if self.driver:
            self.driver.quit()
        super().cleanup()

# 使用示例
def demo_advanced_monitor():
    """演示高级监控功能"""
    monitor = AdvancedWebsiteMonitor()
    
    # 添加需要JavaScript渲染的网站
    print("=== 添加JavaScript网站监控 ===")
    monitor.add_website(
        "https://example.com",
        check_interval=600,
        css_selector=".dynamic-content"  # 监控动态加载的内容
    )
    
    # 测试视觉监控
    print("\n=== 测试视觉监控 ===")
    visual_change = monitor.monitor_visual_changes("https://example.com")
    print(f"视觉变化检测: {'是' if visual_change else '否'}")
    
    # 使用Selenium获取内容
    print("\n=== 使用Selenium获取内容 ===")
    content, extracted = monitor.fetch_web_content_selenium(
        "https://example.com", 
        "h1"
    )
    
    if extracted:
        print(f"提取的内容: {extracted[:100]}...")
    
    return monitor

if __name__ == "__main__":
    demo_advanced_monitor()

5.2 智能变化检测策略

class SmartChangeDetector:
    """
    智能变化检测器 - 使用多种策略提高检测准确性
    """
    
    def __init__(self):
        self.setup_logging()
    
    def setup_logging(self):
        """设置日志记录"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def detect_significant_changes(self, old_content: str, new_content: str, 
                                 content_type: str = "general") -> Tuple[bool, str]:
        """
        检测显著性变化
        
        Args:
            old_content: 旧内容
            new_content: 新内容
            content_type: 内容类型("general", "price", "news"等)
            
        Returns:
            Tuple: (是否显著变化, 变化描述)
        """
        if not old_content or not new_content:
            return False, "内容为空,无法比较"
        
        # 基础哈希比较
        old_hash = hashlib.md5(old_content.encode()).hexdigest()
        new_hash = hashlib.md5(new_content.encode()).hexdigest()
        
        if old_hash == new_hash:
            return False, "内容完全一致"
        
        # 根据内容类型使用不同的检测策略
        if content_type == "price":
            return self.detect_price_changes(old_content, new_content)
        elif content_type == "news":
            return self.detect_news_changes(old_content, new_content)
        else:
            return self.detect_general_changes(old_content, new_content)
    
    def detect_price_changes(self, old_content: str, new_content: str) -> Tuple[bool, str]:
        """
        检测价格变化
        
        Args:
            old_content: 旧内容
            new_content: 新内容
            
        Returns:
            Tuple: (是否价格变化, 变化描述)
        """
        # 提取价格信息(简化实现)
        old_prices = self.extract_prices(old_content)
        new_prices = self.extract_prices(new_content)
        
        if not old_prices and not new_prices:
            return True, "检测到变化但无法提取价格信息"
        
        changes = []
        
        # 比较价格
        for i, (old_price, new_price) in enumerate(zip(old_prices, new_prices)):
            if old_price != new_price:
                change_percent = ((new_price - old_price) / old_price) * 100
                changes.append(f"价格{i+1}: {old_price} → {new_price} ({change_percent:+.2f}%)")
        
        if changes:
            return True, "价格变化:\n" + "\n".join(changes)
        else:
            return True, "检测到变化但不是价格变化"
    
    def extract_prices(self, content: str) -> List[float]:
        """
        从文本中提取价格
        
        Args:
            content: 文本内容
            
        Returns:
            List: 价格列表
        """
        import re
        
        # 简单的价格匹配模式
        price_pattern = r'[\$€¥£]?(\d+[.,]\d{2})|\b(\d+)[.,](\d{2})\b'
        matches = re.findall(price_pattern, content)
        
        prices = []
        for match in matches:
            # 处理不同的价格格式
            price_str = ''.join(match).replace(',', '.')
            try:
                price = float(price_str)
                prices.append(price)
            except ValueError:
                continue
        
        return prices
    
    def detect_news_changes(self, old_content: str, new_content: str) -> Tuple[bool, str]:
        """
        检测新闻内容变化
        
        Args:
            old_content: 旧内容
            new_content: 新内容
            
        Returns:
            Tuple: (是否新闻变化, 变化描述)
        """
        # 分割为段落
        old_paragraphs = [p.strip() for p in old_content.split('\n') if p.strip()]
        new_paragraphs = [p.strip() for p in new_content.split('\n') if p.strip()]
        
        # 查找新增的段落
        new_paragraphs_set = set(new_paragraphs)
        old_paragraphs_set = set(old_paragraphs)
        
        added_paragraphs = new_paragraphs_set - old_paragraphs_set
        removed_paragraphs = old_paragraphs_set - new_paragraphs_set
        
        changes = []
        
        if added_paragraphs:
            changes.append(f"新增 {len(added_paragraphs)} 个段落")
            # 显示前几个新增段落
            for i, para in enumerate(list(added_paragraphs)[:3]):
                changes.append(f"  新增{i+1}: {para[:100]}...")
        
        if removed_paragraphs:
            changes.append(f"删除 {len(removed_paragraphs)} 个段落")
        
        if changes:
            return True, "新闻内容更新:\n" + "\n".join(changes)
        else:
            return True, "检测到变化但无法识别具体更新"
    
    def detect_general_changes(self, old_content: str, new_content: str) -> Tuple[bool, str]:
        """
        检测一般内容变化
        
        Args:
            old_content: 旧内容
            new_content: 新内容
            
        Returns:
            Tuple: (是否显著变化, 变化描述)
        """
        # 计算变化比例
        diff_ratio = self.calculate_change_ratio(old_content, new_content)
        
        if diff_ratio < 0.01:  # 1% 的变化阈值
            return False, f"微小变化 ({diff_ratio:.2%})"
        elif diff_ratio < 0.1:  # 10% 的变化阈值
            return True, f"中等变化 ({diff_ratio:.2%})"
        else:
            return True, f"重大变化 ({diff_ratio:.2%})"
    
    def calculate_change_ratio(self, old_content: str, new_content: str) -> float:
        """
        计算内容变化比例
        
        Args:
            old_content: 旧内容
            new_content: 新内容
            
        Returns:
            float: 变化比例 (0-1)
        """
        if not old_content or not new_content:
            return 1.0  # 如果任一内容为空,认为完全变化
        
        # 使用difflib计算相似度
        matcher = difflib.SequenceMatcher(None, old_content, new_content)
        similarity = matcher.ratio()
        
        return 1 - similarity  # 返回变化比例
    
    def is_seasonal_content(self, content: str) -> bool:
        """
        判断是否为季节性内容(如广告、横幅等)
        
        Args:
            content: 内容文本
            
        Returns:
            bool: 是否为季节性内容
        """
        seasonal_keywords = [
            '促销', '特价', '优惠', '广告', 'banner', 'promotion',
            'sale', 'discount', 'limited', 'limited time'
        ]
        
        content_lower = content.lower()
        for keyword in seasonal_keywords:
            if keyword in content_lower:
                return True
        
        return False

# 集成智能检测到监控器
class SmartWebsiteMonitor(AdvancedWebsiteMonitor):
    """
    智能网站监控器 - 集成智能变化检测
    """
    
    def __init__(self, config_file: str = "monitor_config.json"):
        super().__init__(config_file)
        self.change_detector = SmartChangeDetector()
    
    def detect_changes(self, url: str, current_content: str, 
                      previous_hash: str = None) -> Tuple[bool, Optional[str], Optional[str]]:
        """
        使用智能检测策略
        
        Args:
            url: 网站URL
            current_content: 当前内容
            previous_hash: 上一次的内容哈希值
            
        Returns:
            Tuple: (是否变化, 当前哈希值, 变化描述)
        """
        if not current_content:
            return False, None, "无法获取当前内容"
        
        current_hash = self.calculate_content_hash(current_content)
        
        # 如果没有历史记录,保存初始快照
        if previous_hash is None:
            self.save_content_snapshot(url, current_hash, current_content)
            return False, current_hash, "首次监控,建立基准"
        
        # 检查是否为季节性内容
        if self.change_detector.is_seasonal_content(current_content):
            self.logger.info(f"检测到季节性内容,可能不需要报警: {url}")
            # 仍然保存快照但不触发报警
            self.save_content_snapshot(url, current_hash, current_content)
            return False, current_hash, "季节性内容变化"
        
        # 获取上一次的内容
        previous_content = self.get_previous_content_text(url)
        
        if not previous_content:
            self.save_content_snapshot(url, current_hash, current_content)
            return False, current_hash, "无法获取历史内容"
        
        # 使用智能检测
        content_type = self.determine_content_type(url)
        significant_change, change_description = self.change_detector.detect_significant_changes(
            previous_content, current_content, content_type
        )
        
        # 保存新快照
        self.save_content_snapshot(url, current_hash, current_content)
        
        return significant_change, current_hash, change_description
    
    def determine_content_type(self, url: str) -> str:
        """
        根据URL判断内容类型
        
        Args:
            url: 网站URL
            
        Returns:
            str: 内容类型
        """
        # 简单的URL模式匹配
        if any(keyword in url.lower() for keyword in ['amazon', 'taobao', 'jd', 'shop']):
            return "price"
        elif any(keyword in url.lower() for keyword in ['news', 'blog', 'article']):
            return "news"
        else:
            return "general"

# 使用示例
def demo_smart_monitor():
    """演示智能监控功能"""
    monitor = SmartWebsiteMonitor()
    
    # 添加不同类型网站的监控
    print("=== 添加智能监控网站 ===")
    
    # 电商网站(价格监控)
    monitor.add_website(
        "https://httpbin.org/json",  # 模拟电商网站
        check_interval=300,
        css_selector=None
    )
    
    # 新闻网站
    monitor.add_website(
        "https://example.com",  # 模拟新闻网站
        check_interval=600,
        css_selector="p"  # 监控段落内容
    )
    
    # 执行检查
    print("\n=== 执行智能检查 ===")
    results = monitor.check_all_websites()
    
    for result in results:
        status = "显著变化" if result['changed'] else "无显著变化"
        print(f"网站: {result['url']} - {status}")
        if result.get('change_description'):
            print(f"  分析: {result['change_description']}")
    
    return monitor

if __name__ == "__main__":
    demo_smart_monitor()

6. 完整代码实现

下面是本文中使用的完整代码集合:

"""
网站变化监控系统 - 完整代码实现
日期: 2024年
"""

import requests
from bs4 import BeautifulSoup
import hashlib
import time
import smtplib
import schedule
import json
import sqlite3
from datetime import datetime
import logging
from typing import Dict, List, Optional, Tuple
import difflib
import os
from pathlib import Path
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Selenium相关导入(可选)
try:
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    from selenium.webdriver.chrome.service import Service
    from selenium.webdriver.common.by import By
    from webdriver_manager.chrome import ChromeDriverManager
    SELENIUM_AVAILABLE = True
except ImportError:
    SELENIUM_AVAILABLE = False

# 图像处理相关导入(可选)
try:
    from PIL import Image
    import imagehash
    IMAGE_PROCESSING_AVAILABLE = True
except ImportError:
    IMAGE_PROCESSING_AVAILABLE = False

class WebsiteMonitor:
    """
    网站变化监控器 - 完整实现
    """
    
    def __init__(self, config_file: str = "monitor_config.json"):
        self.config_file = Path(config_file)
        self.config = self.load_config()
        self.setup_logging()
        self.setup_database()
        self.driver = None
        
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('website_monitor.log', encoding='utf-8'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def load_config(self) -> Dict:
        default_config = {
            "monitoring_interval": 300,
            "notification_methods": {
                "email": {
                    "enabled": False,
                    "smtp_server": "smtp.gmail.com",
                    "smtp_port": 587,
                    "username": "your_email@gmail.com",
                    "password": "your_password",
                    "recipient": "recipient@example.com"
                },
                "webhook": {
                    "enabled": False,
                    "url": "https://your-webhook-url.com"
                },
                "telegram": {
                    "enabled": False,
                    "bot_token": "your_bot_token",
                    "chat_id": "your_chat_id"
                }
            },
            "websites": {},
            "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            "use_selenium": False,
            "headless_browser": True
        }
        
        try:
            if self.config_file.exists():
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    loaded_config = json.load(f)
                    return self.merge_configs(default_config, loaded_config)
            else:
                self.save_config(default_config)
                return default_config
        except Exception as e:
            self.logger.error(f"加载配置文件失败: {str(e)}")
            return default_config
    
    def merge_configs(self, default: Dict, user: Dict) -> Dict:
        result = default.copy()
        
        def deep_merge(default_dict, user_dict):
            for key, value in user_dict.items():
                if key in default_dict and isinstance(default_dict[key], dict) and isinstance(value, dict):
                    deep_merge(default_dict[key], value)
                else:
                    default_dict[key] = value
        
        deep_merge(result, user)
        return result
    
    def save_config(self, config: Dict = None):
        if config is None:
            config = self.config
        
        try:
            with open(self.config_file, 'w', encoding='utf-8') as f:
                json.dump(config, f, indent=2, ensure_ascii=False)
            self.logger.info("配置已保存")
        except Exception as e:
            self.logger.error(f"保存配置失败: {str(e)}")
    
    def setup_database(self):
        try:
            self.db_path = Path("website_monitor.db")
            self.conn = sqlite3.connect(self.db_path)
            self.create_tables()
            self.logger.info("数据库初始化完成")
        except Exception as e:
            self.logger.error(f"数据库初始化失败: {str(e)}")
    
    def create_tables(self):
        cursor = self.conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS website_changes (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                website_url TEXT NOT NULL,
                change_type TEXT NOT NULL,
                change_description TEXT,
                previous_content_hash TEXT,
                current_content_hash TEXT,
                change_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                notified BOOLEAN DEFAULT 0
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS content_snapshots (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                website_url TEXT NOT NULL,
                content_hash TEXT NOT NULL,
                content_text TEXT,
                snapshot_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS monitor_configs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                website_url TEXT UNIQUE NOT NULL,
                check_interval INTEGER DEFAULT 300,
                css_selector TEXT,
                enabled BOOLEAN DEFAULT 1,
                last_checked DATETIME
            )
        ''')
        
        self.conn.commit()
    
    def setup_selenium_driver(self):
        """设置Selenium WebDriver"""
        if not SELENIUM_AVAILABLE:
            self.logger.warning("Selenium不可用,无法初始化浏览器驱动")
            return
        
        try:
            chrome_options = Options()
            if self.config.get('headless_browser', True):
                chrome_options.add_argument("--headless")
            chrome_options.add_argument("--no-sandbox")
            chrome_options.add_argument("--disable-dev-shm-usage")
            chrome_options.add_argument("--disable-gpu")
            chrome_options.add_argument("--window-size=1920,1080")
            
            self.driver = webdriver.Chrome(
                service=Service(ChromeDriverManager().install()),
                options=chrome_options
            )
            self.logger.info("Selenium WebDriver初始化完成")
        except Exception as e:
            self.logger.error(f"Selenium WebDriver初始化失败: {str(e)}")
    
    def fetch_web_content(self, url: str, css_selector: str = None) -> Tuple[Optional[str], Optional[str]]:
        """获取网页内容"""
        use_selenium = self.config.get('use_selenium', False)
        
        if use_selenium and SELENIUM_AVAILABLE:
            return self.fetch_web_content_selenium(url, css_selector)
        else:
            return self.fetch_web_content_requests(url, css_selector)
    
    def fetch_web_content_requests(self, url: str, css_selector: str = None) -> Tuple[Optional[str], Optional[str]]:
        """使用requests获取网页内容"""
        try:
            headers = {
                'User-Agent': self.config.get('user_agent', 
                    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
            }
            
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()
            
            raw_content = response.text
            extracted_content = None
            
            if css_selector:
                soup = BeautifulSoup(raw_content, 'html.parser')
                selected_elements = soup.select(css_selector)
                if selected_elements:
                    extracted_content = '\n'.join([elem.get_text(strip=True) for elem in selected_elements])
            
            self.logger.debug(f"成功获取网页内容: {url}")
            return raw_content, extracted_content
            
        except Exception as e:
            self.logger.error(f"获取网页内容失败 {url}: {str(e)}")
            return None, None
    
    def fetch_web_content_selenium(self, url: str, css_selector: str = None, 
                                 wait_time: int = 5) -> Tuple[Optional[str], Optional[str]]:
        """使用Selenium获取网页内容"""
        if not self.driver:
            self.setup_selenium_driver()
        
        if not self.driver:
            self.logger.error("Selenium驱动未初始化")
            return None, None
        
        try:
            self.driver.get(url)
            time.sleep(wait_time)
            
            raw_content = self.driver.page_source
            extracted_content = None
            
            if css_selector:
                try:
                    elements = self.driver.find_elements(By.CSS_SELECTOR, css_selector)
                    extracted_content = '\n'.join([element.text for element in elements])
                except Exception as e:
                    self.logger.warning(f"提取元素失败 {url}: {str(e)}")
            
            return raw_content, extracted_content
            
        except Exception as e:
            self.logger.error(f"Selenium获取网页失败 {url}: {str(e)}")
            return None, None
    
    def calculate_content_hash(self, content: str) -> str:
        return hashlib.md5(content.encode('utf-8')).hexdigest()
    
    def save_content_snapshot(self, url: str, content_hash: str, content_text: str = None):
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT INTO content_snapshots (website_url, content_hash, content_text)
                VALUES (?, ?, ?)
            ''', (url, content_hash, content_text))
            self.conn.commit()
        except Exception as e:
            self.logger.error(f"保存内容快照失败: {str(e)}")
    
    def get_previous_content_hash(self, url: str) -> Optional[str]:
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                SELECT content_hash FROM content_snapshots 
                WHERE website_url = ? 
                ORDER BY snapshot_timestamp DESC 
                LIMIT 1
            ''', (url,))
            result = cursor.fetchone()
            return result[0] if result else None
        except Exception as e:
            self.logger.error(f"获取历史哈希值失败 {url}: {str(e)}")
            return None
    
    def detect_changes(self, url: str, current_content: str, 
                      previous_hash: str = None) -> Tuple[bool, Optional[str], Optional[str]]:
        if not current_content:
            return False, None, "无法获取当前内容"
        
        current_hash = self.calculate_content_hash(current_content)
        
        if previous_hash is None:
            self.save_content_snapshot(url, current_hash, current_content)
            return False, current_hash, "首次监控,建立基准"
        
        if current_hash == previous_hash:
            return False, current_hash, "内容未变化"
        
        previous_content = self.get_previous_content_text(url)
        change_description = self.analyze_content_changes(previous_content, current_content)
        
        self.save_content_snapshot(url, current_hash, current_content)
        return True, current_hash, change_description
    
    def get_previous_content_text(self, url: str) -> Optional[str]:
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                SELECT content_text FROM content_snapshots 
                WHERE website_url = ? 
                ORDER BY snapshot_timestamp DESC 
                LIMIT 1
            ''', (url,))
            result = cursor.fetchone()
            return result[0] if result else None
        except Exception as e:
            self.logger.error(f"获取历史内容失败 {url}: {str(e)}")
            return None
    
    def analyze_content_changes(self, old_content: str, new_content: str) -> str:
        if not old_content or not new_content:
            return "无法比较内容变化"
        
        diff = difflib.unified_diff(
            old_content.splitlines(keepends=True),
            new_content.splitlines(keepends=True),
            fromfile='旧内容',
            tofile='新内容',
            n=3
        )
        
        diff_text = ''.join(diff)
        
        if diff_text:
            lines = diff_text.split('\n')[:10]
            return "检测到内容变化:\n" + '\n'.join(lines)
        else:
            return "内容发生变化但无法生成差异报告"
    
    def add_website(self, url: str, check_interval: int = 300, 
                   css_selector: str = None, enabled: bool = True) -> bool:
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT OR REPLACE INTO monitor_configs 
                (website_url, check_interval, css_selector, enabled, last_checked)
                VALUES (?, ?, ?, ?, ?)
            ''', (url, check_interval, css_selector, enabled, None))
            
            self.conn.commit()
            
            if 'websites' not in self.config:
                self.config['websites'] = {}
            
            self.config['websites'][url] = {
                'check_interval': check_interval,
                'css_selector': css_selector,
                'enabled': enabled
            }
            
            self.save_config()
            self.logger.info(f"已添加监控网站: {url}")
            return True
            
        except Exception as e:
            self.logger.error(f"添加网站失败 {url}: {str(e)}")
            return False
    
    def remove_website(self, url: str) -> bool:
        try:
            cursor = self.conn.cursor()
            cursor.execute('DELETE FROM monitor_configs WHERE website_url = ?', (url,))
            self.conn.commit()
            
            if url in self.config.get('websites', {}):
                del self.config['websites'][url]
                self.save_config()
            
            self.logger.info(f"已移除监控网站: {url}")
            return True
            
        except Exception as e:
            self.logger.error(f"移除网站失败 {url}: {str(e)}")
            return False
    
    def check_website(self, url: str) -> Dict:
        self.logger.info(f"检查网站: {url}")
        
        website_config = self.config['websites'].get(url, {})
        css_selector = website_config.get('css_selector')
        
        raw_content, extracted_content = self.fetch_web_content(url, css_selector)
        content_to_check = extracted_content if extracted_content else raw_content
        
        if not content_to_check:
            return {
                'url': url,
                'changed': False,
                'error': '无法获取网页内容',
                'timestamp': datetime.now()
            }
        
        previous_hash = self.get_previous_content_hash(url)
        changed, current_hash, change_description = self.detect_changes(
            url, content_to_check, previous_hash
        )
        
        result = {
            'url': url,
            'changed': changed,
            'current_hash': current_hash,
            'previous_hash': previous_hash,
            'change_description': change_description,
            'timestamp': datetime.now()
        }
        
        if changed:
            self.logger.info(f"检测到变化: {url}")
            self.record_change(url, change_description, previous_hash, current_hash)
            self.send_notification(url, change_description)
        
        return result
    
    def record_change(self, url: str, change_description: str, 
                     previous_hash: str, current_hash: str):
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT INTO website_changes 
                (website_url, change_type, change_description, previous_content_hash, current_content_hash)
                VALUES (?, ?, ?, ?, ?)
            ''', (url, 'content_change', change_description, previous_hash, current_hash))
            self.conn.commit()
        except Exception as e:
            self.logger.error(f"记录变化失败: {str(e)}")
    
    def send_notification(self, url: str, change_description: str):
        notification_methods = self.config.get('notification_methods', {})
        
        if notification_methods.get('email', {}).get('enabled', False):
            self.send_email_notification(url, change_description)
        
        if notification_methods.get('webhook', {}).get('enabled', False):
            self.send_webhook_notification(url, change_description)
        
        if notification_methods.get('telegram', {}).get('enabled', False):
            self.send_telegram_notification(url, change_description)
    
    def send_email_notification(self, url: str, change_description: str):
        try:
            email_config = self.config['notification_methods']['email']
            
            subject = f"网站变化通知: {url}"
            body = f"""
检测到网站内容发生变化:

网站: {url}
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

变化详情:
{change_description}

---
此通知由网站监控系统自动发送
            """
            
            self._send_email(
                email_config['smtp_server'],
                email_config['smtp_port'],
                email_config['username'],
                email_config['password'],
                email_config['recipient'],
                subject,
                body
            )
            
            self.logger.info(f"已发送邮件通知: {url}")
            
        except Exception as e:
            self.logger.error(f"发送邮件通知失败: {str(e)}")
    
    def _send_email(self, smtp_server: str, port: int, username: str, 
                   password: str, recipient: str, subject: str, body: str):
        """发送邮件具体实现"""
        try:
            msg = MIMEMultipart()
            msg['From'] = username
            msg['To'] = recipient
            msg['Subject'] = subject
            
            msg.attach(MIMEText(body, 'plain', 'utf-8'))
            
            server = smtplib.SMTP(smtp_server, port)
            server.starttls()
            server.login(username, password)
            server.send_message(msg)
            server.quit()
            
        except Exception as e:
            self.logger.error(f"邮件发送失败: {str(e)}")
            raise
    
    def send_webhook_notification(self, url: str, change_description: str):
        try:
            webhook_config = self.config['notification_methods']['webhook']
            webhook_url = webhook_config['url']
            
            payload = {
                'url': url,
                'change_description': change_description,
                'timestamp': datetime.now().isoformat(),
                'type': 'website_change'
            }
            
            response = requests.post(webhook_url, json=payload, timeout=10)
            response.raise_for_status()
            
            self.logger.info(f"已发送Webhook通知: {url}")
            
        except Exception as e:
            self.logger.error(f"发送Webhook通知失败: {str(e)}")
    
    def send_telegram_notification(self, url: str, change_description: str):
        try:
            telegram_config = self.config['notification_methods']['telegram']
            bot_token = telegram_config['bot_token']
            chat_id = telegram_config['chat_id']
            
            message = f"""
🔔 网站变化通知

🌐 网站: {url}
⏰ 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

📝 变化详情:
{change_description}

#网站监控
            """
            
            api_url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
            payload = {
                'chat_id': chat_id,
                'text': message,
                'parse_mode': 'HTML'
            }
            
            response = requests.post(api_url, json=payload, timeout=10)
            response.raise_for_status()
            
            self.logger.info(f"已发送Telegram通知: {url}")
            
        except Exception as e:
            self.logger.error(f"发送Telegram通知失败: {str(e)}")
    
    def check_all_websites(self) -> List[Dict]:
        results = []
        websites = self.config.get('websites', {})
        
        for url, config in websites.items():
            if config.get('enabled', True):
                try:
                    result = self.check_website(url)
                    results.append(result)
                    time.sleep(1)  # 避免请求过于频繁
                except Exception as e:
                    self.logger.error(f"检查网站失败 {url}: {str(e)}")
                    results.append({
                        'url': url,
                        'changed': False,
                        'error': str(e),
                        'timestamp': datetime.now()
                    })
        
        return results
    
    def start_monitoring(self):
        self.logger.info("启动网站监控服务")
        
        interval = self.config.get('monitoring_interval', 300)
        schedule.every(interval).seconds.do(self.check_all_websites)
        
        self.check_all_websites()
        
        try:
            while True:
                schedule.run_pending()
                time.sleep(1)
        except KeyboardInterrupt:
            self.logger.info("监控服务已停止")
        finally:
            self.cleanup()
    
    def cleanup(self):
        if self.driver:
            self.driver.quit()
        if hasattr(self, 'conn'):
            self.conn.close()
        self.logger.info("资源清理完成")

def main():
    """主函数 - 演示完整功能"""
    monitor = WebsiteMonitor()
    
    print("=== 网站变化监控系统 ===")
    print("请选择操作:")
    print("1. 添加监控网站")
    print("2. 移除监控网站")
    print("3. 立即检查所有网站")
    print("4. 开始自动监控")
    print("5. 查看监控统计")
    print("6. 退出")
    
    while True:
        choice = input("\n请输入选择 (1-6): ").strip()
        
        if choice == '1':
            url = input("请输入网站URL: ").strip()
            interval = input("检查间隔(秒,默认300): ").strip()
            interval = int(interval) if interval else 300
            selector = input("CSS选择器(可选): ").strip()
            selector = selector if selector else None
            
            success = monitor.add_website(url, interval, selector)
            if success:
                print(f"成功添加网站: {url}")
            else:
                print("添加网站失败")
        
        elif choice == '2':
            url = input("请输入要移除的网站URL: ").strip()
            success = monitor.remove_website(url)
            if success:
                print(f"成功移除网站: {url}")
            else:
                print("移除网站失败")
        
        elif choice == '3':
            print("开始检查所有网站...")
            results = monitor.check_all_websites()
            print(f"检查完成,共检查 {len(results)} 个网站")
            
            changes = [r for r in results if r['changed']]
            if changes:
                print(f"发现 {len(changes)} 个网站有变化:")
                for result in changes:
                    print(f"  - {result['url']}")
                    print(f"    变化: {result['change_description'][:100]}...")
            else:
                print("所有网站均无变化")
        
        elif choice == '4':
            print("启动自动监控服务...")
            print("按 Ctrl+C 停止监控")
            monitor.start_monitoring()
        
        elif choice == '5':
            websites = monitor.config.get('websites', {})
            print(f"监控网站数量: {len(websites)}")
            print("当前监控的网站:")
            for url, config in websites.items():
                status = "启用" if config.get('enabled', True) else "禁用"
                print(f"  - {url} ({status})")
        
        elif choice == '6':
            print("谢谢使用!")
            break
        
        else:
            print("无效选择,请重新输入")

if __name__ == "__main__":
    main()

7. 代码自查和优化

为确保代码质量和减少BUG,我们对所有代码进行了以下自查:

7.1 代码质量检查

7.2 性能优化

7.3 安全性改进

7.4 健壮性提升

8. 总结

通过本文的详细介绍和代码示例,我们构建了一个功能完整的网站变化监控系统。这个系统不仅能够检测网页内容的变化,还能通过多种渠道及时通知用户,大大提高了信息获取的效率。

8.1 主要收获

8.2 最佳实践建议

8.3 应用前景

网站变化监控技术在以下领域有着广泛的应用前景:

通过掌握这些技术,您可以构建出适合自己需求的智能监控系统,无论是用于商业竞争还是个人便利,都能带来显著的价值。随着人工智能技术的发展,未来的网站监控系统将会更加智能和精准,为用户提供更好的服务体验。

到此这篇关于Python实现监控网站变化并自动通知的文章就介绍到这了,更多相关Python监控网站变化内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文