python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python管理Windows补丁

Python脚本实现自动管理Windows补丁

作者:红魔Y

Windows补丁管理是安全合规的基础,但手动一台台打补丁简直是噩梦,本文就来教你用 Python 结合 wua模块和 PowerShell实现自动化的补丁扫描,下载,安装和合规报告生成吧

补丁管理的核心挑战

企业环境中 Windows 补丁管理面临的现实问题:

  1. 数量庞大:每月"补丁星期二"可能有 50+ 个补丁
  2. 影响评估难:补丁可能导致业务软件不兼容
  3. 回滚复杂:打完补丁出问题需要快速恢复
  4. 合规审计:等保、ISO 27001 要求补丁覆盖率 ≥ 95%
  5. 窗口期短:高危漏洞从发布到被利用可能只有几天

方案一:WUA COM 接口(核心方案)

Windows Update Agent (WUA) 是 Windows 内置的补丁管理 COM 组件,Python 通过 win32com 可以直接调用。

扫描可用更新

import win32com.client

def create_update_session():
    """创建 Windows Update Session"""
    return win32com.client.Dispatch("Microsoft.Update.Session")

def scan_for_updates():
    """
    扫描系统中可用的更新
    返回: (待安装列表, 可选安装列表)
    """
    session = create_update_session()
    searcher = session.CreateUpdateSearcher()

    print("正在扫描可用更新...(可能需要几分钟)")

    try:
        # 搜索所有可用更新
        result = searcher.Search("IsInstalled=0")

        pending = []   # 重要/推荐更新
        optional = []  # 可选更新

        for update in result.Updates:
            info = {
                "title": update.Title,
                "description": update.Description[:100] if update.Description else "",
                "kb": _extract_kb(update.Title),
                "severity": _get_severity(update),
                "size_mb": round(update.DownloadContents.TotalSize / 1024 / 1024, 1)
                            if update.DownloadContents.TotalSize > 0 else 0,
                "is_installed": update.IsInstalled,
            }

            if info["severity"] in ("Critical", "Important"):
                pending.append(info)
            else:
                optional.append(info)

        print(f"扫描完成!")
        print(f"  待安装(重要): {len(pending)} 个")
        print(f"  可选更新: {len(optional)} 个")
        print(f"  总大小: {sum(u['size_mb'] for u in pending + optional):.1f} MB")

        return pending, optional

    except Exception as e:
        print(f"扫描失败: {e}")
        return [], []


def _extract_kb(title):
    """从标题中提取 KB 编号"""
    import re
    match = re.search(r"(KB\d+)", title, re.IGNORECASE)
    return match.group(1) if match else ""


def _get_severity(update):
    """获取更新严重级别"""
    try:
        for category in update.Categories:
            name = category.Name.lower()
            if "critical" in name:
                return "Critical"
            if "important" in name:
                return "Important"
            if "moderate" in name:
                return "Moderate"
        return "Low"
    except Exception:
        return "Unknown"


# 使用
pending, optional = scan_for_updates()
print("\n重要更新列表:")
for u in pending[:10]:
    print(f"  [{u['severity']}] {u['title']}")
    print(f"    KB: {u['kb']}, 大小: {u['size_mb']} MB")

下载并安装更新

import time

def download_updates(updates_info, progress_callback=None):
    """
    下载指定的更新
    updates_info: 从 scan_for_updates 获取的列表
    """
    session = create_update_session()
    downloader = session.CreateUpdateDownloader()

    # 根据标题创建更新集合
    searcher = session.CreateUpdateSearcher()
    result = searcher.Search("IsInstalled=0")

    updates_to_download = win32com.client.Dispatch("Microsoft.Update.UpdateColl")
    for update in result.Updates:
        for info in updates_info:
            if update.Title == info["title"]:
                updates_to_download.Add(update)
                break

    if updates_to_download.Count == 0:
        print("没有需要下载的更新")
        return True

    downloader.Updates = updates_to_download

    print(f"开始下载 {updates_to_download.Count} 个更新...")

    try:
        download_result = downloader.Download()
        if download_result.ResultCode == 2:  # Succeeded
            print("下载完成!")
            return True
        else:
            print(f"下载失败,结果码: {download_result.ResultCode}")
            return False
    except Exception as e:
        print(f"下载出错: {e}")
        return False


def install_updates(updates_info, progress_callback=None):
    """
    安装已下载的更新
    注意:安装后可能需要重启
    """
    session = create_update_session()
    installer = session.CreateUpdateInstaller()

    # 获取已下载的更新
    searcher = session.CreateUpdateSearcher()
    result = searcher.Search("IsInstalled=0 AND IsDownloaded=1")

    updates_to_install = win32com.client.Dispatch("Microsoft.Update.UpdateColl")
    for update in result.Updates:
        for info in updates_info:
            if update.Title == info["title"]:
                updates_to_install.Add(update)
                break

    if updates_to_install.Count == 0:
        print("没有已下载的更新可供安装")
        return {"installed": 0, "failed": 0, "reboot_required": False}

    installer.Updates = updates_to_install

    print(f"开始安装 {updates_to_install.Count} 个更新...")
    print("⚠️ 此操作需要管理员权限!")

    try:
        install_result = installer.Install()

        installed = install_result.ResultCode
        reboot_required = installer.RebootRequired

        # 统计结果
        success_count = 0
        fail_count = 0

        for i in range(install_result.Updates.Count):
            result_code = install_result.GetUpdateResult(i).ResultCode
            update_title = install_result.Updates.Item(i).Title
            if result_code == 2:  # Succeeded
                success_count += 1
                print(f"  ✓ {update_title}")
            else:
                fail_count += 1
                print(f"  ✗ {update_title} (失败: {result_code})")

        print(f"\n安装完成: {success_count} 成功, {fail_count} 失败")
        if reboot_required:
            print("⚠️ 需要重启才能完成安装")

        return {
            "installed": success_count,
            "failed": fail_count,
            "reboot_required": reboot_required,
        }

    except Exception as e:
        print(f"安装出错: {e}")
        return {"installed": 0, "failed": 0, "reboot_required": False, "error": str(e)}


# 使用示例(请以管理员身份运行)
# pending, _ = scan_for_updates()
# download_updates(pending)
# install_updates(pending)

查看已安装补丁历史

def get_installed_updates(days=30):
    """
    获取最近安装的更新历史
    days: 查看最近多少天的安装记录
    """
    session = create_update_session()
    searcher = session.CreateUpdateHistory()

    from datetime import datetime, timedelta
    cutoff = datetime.now() - timedelta(days=days)

    history = []
    for entry in searcher:
        try:
            date_str = entry.Date
            install_date = datetime.strptime(
                str(date_str)[:10], "%Y-%m-%d"
            ) if date_str else None

            if install_date and install_date >= cutoff:
                history.append({
                    "title": entry.Title,
                    "date": install_date,
                    "result": "成功" if entry.ResultCode == 2 else "失败",
                    "kb": _extract_kb(entry.Title),
                })
        except Exception:
            continue

    # 按日期排序
    history.sort(key=lambda x: x["date"], reverse=True)

    print(f"最近 {days} 天安装的更新 ({len(history)} 个):\n")
    for h in history[:20]:
        print(f"  [{h['date'].strftime('%Y-%m-%d')}] {h['result']} "
              f"{h['title']}")

    return history


# 使用
# get_installed_updates(days=30)

方案二:PowerShell 桥接(无需 pywin32)

import subprocess
import json

def scan_updates_powershell():
    """通过 PowerShell 扫描可用更新"""
    ps_cmd = '''
    $UpdateSession = New-Object -ComObject Microsoft.Update.Session
    $UpdateSearcher = $UpdateSession.CreateUpdateSearcher()
    $SearchResult = $UpdateSearcher.Search("IsInstalled=0")

    $updates = @()
    foreach ($Update in $SearchResult.Updates) {
        $updates += @{
            Title = $Update.Title
            KB = if ($Update.Title -match 'KB(\d+)') { "KB$($Matches[1])" } else { "" }
            Size = [math]::Round($Update.DownloadContents.TotalSize / 1MB, 1)
            Severity = if ($Update.Categories | Where-Object { $_.Name -match 'Critical' }) { "Critical" } else { "Important" }
        }
    }

    $updates | ConvertTo-Json -Depth 3
    '''

    result = subprocess.run(
        ["powershell", "-Command", ps_cmd],
        capture_output=True, text=True, timeout=120
    )

    if result.returncode != 0:
        print(f"错误: {result.stderr}")
        return []

    try:
        data = json.loads(result.stdout)
        return data if isinstance(data, list) else [data]
    except json.JSONDecodeError:
        return []


def install_updates_powershell(kb_list=None):
    """
    通过 PowerShell 安装更新
    kb_list: 指定要安装的 KB 编号列表,None=全部安装
    """
    if kb_list:
        kb_filter = ", ".join(f'"{kb}"' for kb in kb_list)
        ps_cmd = f'''
        $criteria = "IsInstalled=0"
        $Session = New-Object -ComObject Microsoft.Update.Session
        $Searcher = $Session.CreateUpdateSearcher()
        $Result = $Searcher.Search($criteria)

        $Installer = $Session.CreateUpdateInstaller()
        $UpdatesToInstall = New-Object -ComObject Microsoft.Update.UpdateColl

        $kbList = @({kb_filter})
        foreach ($Update in $Result.Updates) {
            $kbMatch = $Update.Title -match 'KB(\d+)'
            if ($kbMatch -and $Matches[1] -in $kbList) {{
                $UpdatesToInstall.Add($Update)
            }}
        }}

        $Installer.Updates = $UpdatesToInstall
        $InstallResult = $Installer.Install()
        $InstallResult | ConvertTo-Json
        '''
    else:
        ps_cmd = r'''
        $Session = New-Object -ComObject Microsoft.Update.Session
        $Searcher = $Session.CreateUpdateSearcher()
        $Result = $Searcher.Search("IsInstalled=0 AND IsDownloaded=1")

        $Installer = $Session.CreateUpdateInstaller()
        $Installer.Updates = $Result.Updates
        $InstallResult = $Installer.Install()

        @{
            Installed = $InstallResult.ResultCode
            RebootRequired = $Installer.RebootRequired
            UpdatesCount = $Result.Updates.Count
        } | ConvertTo-Json
        '''

    result = subprocess.run(
        ["powershell", "-Command", ps_cmd],
        capture_output=True, text=True, timeout=600
    )

    print(result.stdout)
    return result.returncode == 0

方案三:批量远程补丁管理

核心场景——同时给 100 台电脑打补丁:

import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

def remote_patch_scan(
    computer,
    username=None,
    password=None
):
    """
    通过 PowerShell Remoting 远程扫描补丁
    """
    cred_part = ""
    if username and password:
        cred_part = (
            f'$secpass = ConvertTo-SecureString "{password}" -AsPlainText -Force; '
            f'$cred = New-Object System.Management.Automation.PSCredential("{username}", $secpass); '
        )

    ps_script = f'''
    {cred_part}
    $session = New-PSSession -ComputerName "{computer}" {f"-Credential $cred" if username else ""}
    Invoke-Command -Session $session -ScriptBlock {{
        $Searcher = New-Object -ComObject Microsoft.Update.Session
        $Searcher = $Searcher.CreateUpdateSearcher()
        $Result = $Searcher.Search("IsInstalled=0")

        @{{
            Computer = $env:COMPUTERNAME
            PendingUpdates = $Result.Updates.Count
            CriticalCount = ($Result.Updates | Where-Object {{
                $_.Categories.Name -match 'Critical'
            }}).Count
            LastScanDate = (Get-HotFix | Sort-Object InstalledOn -Descending | Select-Object -First 1).InstalledOn
        }} | ConvertTo-Json
    }}
    Remove-PSSession -Session $session -ErrorAction SilentlyContinue
    '''

    result = subprocess.run(
        ["powershell", "-Command", ps_script],
        capture_output=True, text=True, timeout=120
    )

    try:
        data = json.loads(result.stdout.strip().split('\n')[-1])
        return {"computer": computer, "status": "ok", **data}
    except (json.JSONDecodeError, IndexError):
        return {
            "computer": computer,
            "status": "error",
            "error": result.stderr.strip()
        }


def batch_scan_patches(
    computers,
    max_workers=10,
    username=None,
    password=None
):
    """批量扫描多台电脑的补丁状态"""
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                remote_patch_scan, pc, username, password
            ): pc for pc in computers
        }

        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            icon = "✓" if result["status"] == "ok" else "✗"
            if result["status"] == "ok":
                pending = result.get("PendingUpdates", "?")
                critical = result.get("CriticalCount", "?")
                print(f"  {icon} {result['computer']}: "
                      f"{pending} 待安装, {critical} 关键")
            else:
                print(f"  {icon} {result['computer']}: {result.get('error', '未知错误')}")

    return results


def generate_compliance_report(results, output_file="patch_compliance.html"):
    """生成补丁合规报告"""
    total = len(results)
    compliant = sum(
        1 for r in results
        if r["status"] == "ok" and r.get("PendingUpdates", 1) == 0
    )
    non_compliant = total - compliant

    # 超过 30 天未更新的机器
    stale = []
    for r in results:
        if r["status"] == "ok":
            last_scan = r.get("LastScanDate")
            if last_scan:
                try:
                    scan_date = datetime.strptime(
                        str(last_scan)[:10], "%Y-%m-%d"
                    )
                    if (datetime.now() - scan_date).days > 30:
                        stale.append(r["computer"])
                except (ValueError, TypeError):
                    pass

    compliance_rate = (compliant / total * 100) if total > 0 else 0

    html = f"""<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>补丁合规报告</title>
    <style>
        body {{ font-family: 'Segoe UI', sans-serif; background: #f5f5f5; padding: 20px; }}
        .container {{ max-width: 1200px; margin: 0 auto; }}
        h1 {{ color: #333; }}
        .summary {{ display: grid; grid-template-columns: repeat(3, 1fr); gap: 20px; margin: 20px 0; }}
        .card {{ background: white; border-radius: 8px; padding: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); text-align: center; }}
        .card .number {{ font-size: 36px; font-weight: bold; }}
        .card.good .number {{ color: #27ae60; }}
        .card.bad .number {{ color: #e74c3c; }}
        .card.warn .number {{ color: #f39c12; }}
        table {{ width: 100%; background: white; border-radius: 8px; overflow: hidden; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
        th {{ background: #2c3e50; color: white; padding: 12px; text-align: left; }}
        td {{ padding: 10px 12px; border-bottom: 1px solid #eee; }}
        tr:hover {{ background: #f9f9f9; }}
        .status-ok {{ color: #27ae60; font-weight: bold; }}
        .status-warn {{ color: #e74c3c; font-weight: bold; }}
    </style>
</head>
<body>
    <div class="container">
        <h1>Windows 补丁合规报告</h1>
        <p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>

        <div class="summary">
            <div class="card good">
                <div class="number">{compliance_rate:.0f}%</div>
                <div>合规率</div>
            </div>
            <div class="card good">
                <div class="number">{compliant}</div>
                <div>已合规</div>
            </div>
            <div class="card bad">
                <div class="number">{non_compliant}</div>
                <div>待处理</div>
            </div>
        </div>

        <table>
            <tr>
                <th>计算机名</th>
                <th>待安装补丁</th>
                <th>关键补丁</th>
                <th>最后扫描日期</th>
                <th>状态</th>
            </tr>"""

    for r in sorted(results, key=lambda x: x.get("PendingUpdates", 0), reverse=True):
        if r["status"] == "ok":
            pending = r.get("PendingUpdates", 0)
            critical = r.get("CriticalCount", 0)
            last_scan = str(r.get("LastScanDate", "未知"))[:10]
            status_class = "status-warn" if pending > 0 else "status-ok"
            status_text = "待处理" if pending > 0 else "合规"
            html += f"""
            <tr>
                <td>{r['computer']}</td>
                <td>{pending}</td>
                <td>{critical}</td>
                <td>{last_scan}</td>
                <td class="{status_class}">{status_text}</td>
            </tr>"""

    html += """
        </table>
    </div>
</body>
</html>"""

    with open(output_file, "w", encoding="utf-8") as f:
        f.write(html)

    print(f"\n合规报告已生成: {output_file}")
    print(f"合规率: {compliance_rate:.1f}% ({compliant}/{total})")
    return output_file


# 使用示例
# computers = [f"PC-{i:03d}" for i in range(1, 101)]
# results = batch_scan_patches(computers)
# generate_compliance_report(results)

实用工具:补丁回滚

补丁打出问题了?快速回滚:

def uninstall_update(kb_number):
    """
    卸载指定的 KB 补丁
    kb_number: 例如 "KB5005565"
    """
    # 方法1:使用 wusa 命令
    cmd = f"wusa /uninstall /kb:{kb_number.replace('KB', '')} /quiet /norestart"

    result = subprocess.run(
        cmd, shell=True, capture_output=True, text=True, timeout=300
    )

    if result.returncode == 0:
        print(f"补丁 {kb_number} 已卸载")
        print("建议重启电脑以完成卸载")
        return True
    else:
        print(f"卸载失败: {result.stderr}")
        return False


def list_installed_hotfixes(search_term=""):
    """列出已安装的补丁"""
    ps_cmd = f'Get-HotFix | Where-Object {{ $_.HotFixID -like "*{search_term}*" }} | Sort-Object InstalledOn -Descending | Select-Object -First 20 | Format-Table -AutoSize'

    result = subprocess.run(
        ["powershell", "-Command", ps_cmd],
        capture_output=True, text=True, timeout=30
    )

    print(result.stdout)
    return result.stdout


def check_vulnerability_status():
    """
    检查系统的已知漏洞状态
    通过对比已安装补丁和已知漏洞 KB 列表
    """
    # 常见的关键安全补丁列表(示例)
    critical_kbs = [
        ("KB5005565", "PrintNightmare 打印机漏洞"),
        ("KB5012170", "CVE-2022-26925 LSA 泄露"),
        ("KB5009543", "Active Directory 安全绕过"),
        ("KB5011048", "远程代码执行漏洞"),
    ]

    print("检查关键漏洞补丁状态:\n")

    # 获取已安装补丁列表
    result = subprocess.run(
        ["powershell", "-Command", "Get-HotFix | Select-Object HotFixID"],
        capture_output=True, text=True, timeout=30
    )
    installed = set(result.stdout.splitlines())

    for kb, desc in critical_kbs:
        status = "✓ 已安装" if kb in installed else "✗ 未安装"
        color = "安全" if kb in installed else "⚠️ 有风险"
        print(f"  {status} | {kb} | {desc} | {color}")

    return critical_kbs

补丁管理最佳实践

分批发布策略

def staged_deployment(computers, batch_size=10, delay_hours=24):
    """
    分批部署补丁
    先在第一批验证,确认无问题后再推后续批次
    """
    batches = [
        computers[i:i + batch_size]
        for i in range(0, len(computers), batch_size)
    ]

    print(f"共 {len(computers)} 台电脑,分为 {len(batches)} 批")
    print(f"每批 {batch_size} 台,间隔 {delay_hours} 小时\n")

    for i, batch in enumerate(batches, 1):
        print(f"=== 第 {i} 批 ({len(batch)} 台) ===")
        print(f"电脑: {', '.join(batch)}")

        # 第一批需要人工确认
        if i == 1:
            print("⚠️ 这是第一批,建议先在此批次验证补丁兼容性")
            confirm = input("确认安装?(y/n): ").strip().lower()
            if confirm != 'y':
                print("跳过此批次")
                continue

        # 执行安装(此处调用实际的安装函数)
        print(f"正在安装...\n")

        # 实际场景中,这里调用 remote_patch_install
        # 如果不是最后一批,等待确认
        if i < len(batches):
            print(f"第 {i} 批完成。等待 {delay_hours} 小时后继续下一批")
            print("期间请验证业务系统正常运行")

补丁策略建议

PATCH_POLICY = {
    "Critical": {
        "deploy_window": "72小时内",     # 关键补丁 3 天内部署
        "reboot_required": True,
        "approval": "自动",
    },
    "Important": {
        "deploy_window": "14天内",       # 重要补丁 2 周内部署
        "reboot_required": True,
        "approval": "测试后自动",
    },
    "Moderate": {
        "deploy_window": "30天内",       # 中等补丁 1 个月内
        "reboot_required": False,
        "approval": "人工审核",
    },
    "Low": {
        "deploy_window": "随下次更新周期",  # 低风险随缘
        "reboot_required": False,
        "approval": "人工审核",
    },
}

小结

需求方案命令/模块
本地扫描补丁win32com WUAMicrosoft.Update.Session
无 pywin32PowerShell 桥接Get-WinEvent, wusa
远程批量扫描PowerShell RemotingInvoke-Command -Session
安装补丁WUA Installerinstaller.Install()
卸载补丁wusa 命令wusa /uninstall /kb:xxx
合规报告HTML 生成自动统计合规率
回滚wusa / 系统还原wusa /uninstall

补丁管理是安全运营的基石。再忙也要打补丁——这不是建议,是底线。

到此这篇关于Python脚本实现自动管理Windows补丁的文章就介绍到这了,更多相关Python管理Windows补丁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文