python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python管理Windows域

Python脚本实现自动化管理Windows域

作者:红魔Y

这篇文章主要为大家详细介绍了如何通过Python 的 ldap3 和 PowerShell 实现域用户批量管理,组策略自动推送等操作,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下

新员工入职,IT 部门要在 AD 里创建账号、加到对应的组、分配 OU、推送组策略、安装软件……手动操作一个员工 20 分钟,100 个新人就是 33 小时。用 Python?脚本跑一遍,5 分钟。

为什么需要自动化域管理?

Active Directory (AD) 是企业 IT 基础设施的核心,几乎所有管理工作都围绕它展开:

操作手动方式Python 自动化
创建用户ADUC 里右键→新建→填表一行脚本
重置密码找到用户→右键→重置→输入批量脚本
查找锁定账户搜索→筛选→逐个查看一键列出
创建计算机账户ADUC→ Computers→新建批量导入
推送组策略GPMC→找到策略→链接脚本部署
软件部署组策略→软件安装→配置SCCM/PS 脚本

方案一:ldap3 库(纯 Python 方案)

ldap3 是一个纯 Python 实现的 LDAP 客户端,不需要安装任何 C 扩展。

安装

pip install ldap3

连接 AD 域控制器

from ldap3 import Server, Connection, ALL, SUBTREE, MODIFY_ADD, MODIFY_REPLACE
from ldap3.core.exceptions import LDAPException

class ADManager:
    """Active Directory 域管理器"""

    def __init__(self, domain, dc_host, username, password, use_ssl=True):
        """
        domain: 例如 "corp.example.com"
        dc_host: 域控制器地址,例如 "dc01.corp.example.com"
        username: 管理员账号,例如 "admin@corp.example.com"
        password: 密码
        """
        self.domain = domain
        self.server = Server(
            dc_host,
            use_ssl=use_ssl,
            get_info=ALL,
        )
        self.conn = Connection(
            self.server,
            user=username,
            password=password,
            authentication="NTLM",
            auto_bind=True,
        )

        if self.conn.bind():
            print(f"✓ 已连接到域控制器: {dc_host}")
        else:
            raise LDAPException(f"连接失败: {self.conn.last_error}")

        # 基础 DN
        self.base_dn = ",".join(
            f"DC={part}" for part in domain.split(".")
        )

    def search_users(self, filter_expr=None, attributes=None):
        """
        搜索域用户
        filter_expr: LDAP 过滤表达式
        attributes: 返回的属性列表
        """
        if filter_expr is None:
            filter_expr = "(objectClass=user)"

        if attributes is None:
            attributes = [
                "cn", "sAMAccountName", "mail",
                "department", "title", "whenCreated",
                "lastLogon", "userAccountControl",
                "memberOf", "distinguishedName",
            ]

        self.conn.search(
            search_base=self.base_dn,
            search_filter=filter_expr,
            search_scope=SUBTREE,
            attributes=attributes,
        )

        users = []
        for entry in self.conn.entries:
            user = {}
            for attr in attributes:
                try:
                    value = getattr(entry, attr, None)
                    user[attr] = str(value.value) if value else ""
                except Exception:
                    user[attr] = ""
            users.append(user)

        return users

    def close(self):
        """关闭连接"""
        self.conn.unbind()


# 使用
ad = ADManager(
    domain="corp.example.com",
    dc_host="dc01.corp.example.com",
    username="admin@corp.example.com",
    password="YourPassword"
)

# 搜索所有用户
users = ad.search_users()
print(f"域中共有 {len(users)} 个用户\n")

# 搜索特定部门
dev_users = ad.search_users(
    filter_expr="(&(objectClass=user)(department=Engineering))"
)
print(f"Engineering 部门: {len(dev_users)} 人")

ad.close()

用户管理 CRUD

class ADUserManager(ADManager):
    """AD 用户管理扩展"""

    def create_user(
        self,
        username,
        first_name,
        last_name,
        password,
        ou_path=None,
        department="",
        title="",
        email="",
        groups=None
    ):
        """
        创建域用户
        username: 登录名 (sAMAccountName)
        ou_path: OU 路径,例如 "OU=Users,OU=Shanghai,DC=corp,DC=example,DC=com"
        """
        if ou_path is None:
            ou_path = f"CN=Users,{self.base_dn}"

        dn = f"CN={username},{ou_path}"
        display_name = f"{first_name} {last_name}"

        # UPN (User Principal Name)
        upn = f"{username}@{self.domain}"

        # 默认密码需要符合策略
        default_password = password or "P@ssw0rd123!"

        try:
            self.conn.add(
                dn,
                attributes={
                    "objectClass": ["top", "person", "organizationalPerson", "user"],
                    "cn": username,
                    "sAMAccountName": username,
                    "userPrincipalName": upn,
                    "displayName": display_name,
                    "givenName": first_name,
                    "sn": last_name,
                    "mail": email or f"{username}@{self.domain}",
                    "department": department,
                    "title": title,
                    "userAccountControl": 512,  # 512 = 正常启用
                }
            )

            if self.conn.result["result"] == 0:
                print(f"✓ 用户创建成功: {username}")
                # 设置密码
                self._set_password(dn, default_password)
                # 加入组
                if groups:
                    for group in groups:
                        self.add_to_group(username, group)
                return True
            else:
                print(f"✗ 创建失败: {self.conn.result['description']}")
                return False

        except LDAPException as e:
            print(f"✗ 创建失败: {e}")
            return False

    def _set_password(self, user_dn, new_password):
        """设置用户密码"""
        try:
            # 注意:设置密码需要 SSL/TLS 连接
            self.conn.extend.microsoft.modify_password(
                user_dn, new_password
            )
            print(f"  密码已设置")
            return True
        except Exception as e:
            print(f"  密码设置失败: {e}")
            return False

    def reset_password(self, username, new_password):
        """重置用户密码"""
        user_dn = self._find_user_dn(username)
        if user_dn:
            return self._set_password(user_dn, new_password)
        return False

    def _find_user_dn(self, username):
        """根据用户名查找 DN"""
        self.conn.search(
            self.base_dn,
            f"(sAMAccountName={username})",
            attributes=["distinguishedName"],
        )
        if self.conn.entries:
            return str(self.conn.entries[0].distinguishedName)
        return None

    def disable_user(self, username):
        """禁用用户账户"""
        user_dn = self._find_user_dn(username)
        if not user_dn:
            print(f"用户不存在: {username}")
            return False

        # 514 = 禁用
        self.conn.modify(
            user_dn,
            {"userAccountControl": [(MODIFY_REPLACE, [514])]}
        )
        if self.conn.result["result"] == 0:
            print(f"✓ 已禁用: {username}")
            return True
        return False

    def enable_user(self, username):
        """启用用户账户"""
        user_dn = self._find_user_dn(username)
        if not user_dn:
            return False

        # 512 = 正常启用
        self.conn.modify(
            user_dn,
            {"userAccountControl": [(MODIFY_REPLACE, [512])]}
        )
        if self.conn.result["result"] == 0:
            print(f"✓ 已启用: {username}")
            return True
        return False

    def add_to_group(self, username, group_name):
        """将用户添加到组"""
        user_dn = self._find_user_dn(username)
        if not user_dn:
            return False

        group_dn = self._find_group_dn(group_name)
        if not group_dn:
            print(f"组不存在: {group_name}")
            return False

        self.conn.modify(
            group_dn,
            {"member": [(MODIFY_ADD, [user_dn])]}
        )
        if self.conn.result["result"] == 0:
            print(f"  ✓ {username} 已加入 {group_name}")
            return True
        print(f"  ✗ 加入组失败: {self.conn.result['description']}")
        return False

    def _find_group_dn(self, group_name):
        """根据组名查找 DN"""
        self.conn.search(
            self.base_dn,
            f"(&(objectClass=group)(cn={group_name}))",
            attributes=["distinguishedName"],
        )
        if self.conn.entries:
            return str(self.conn.entries[0].distinguishedName)
        return None


# 使用示例
# mgr = ADUserManager("corp.example.com", "dc01.corp.example.com", ...)
#
# 批量创建用户
# new_users = [
#     {"username": "zhangsan", "first_name": "三", "last_name": "张", "department": "Engineering", "groups": ["VPN-Users", "Dev-Team"]},
#     {"username": "lisi", "first_name": "四", "last_name": "李", "department": "HR", "groups": ["HR-Team"]},
# ]
# for u in new_users:
#     mgr.create_user(**u)

批量操作:从 CSV 导入

import csv

def bulk_create_users_from_csv(ad_manager, csv_file, default_password="P@ssw0rd123!"):
    """从 CSV 文件批量创建用户"""
    with open(csv_file, "r", encoding="utf-8-sig") as f:
        reader = csv.DictReader(f)

        results = {"success": 0, "failed": 0, "errors": []}

        for row in reader:
            username = row.get("username", "").strip()
            if not username:
                continue

            groups = [g.strip() for g in row.get("groups", "").split(",") if g.strip()]

            success = ad_manager.create_user(
                username=username,
                first_name=row.get("first_name", ""),
                last_name=row.get("last_name", ""),
                password=default_password,
                department=row.get("department", ""),
                title=row.get("title", ""),
                groups=groups if groups else None,
            )

            if success:
                results["success"] += 1
            else:
                results["failed"] += 1
                results["errors"].append(username)

    print(f"\n批量创建完成: {results['success']} 成功, {results['failed']} 失败")
    if results["errors"]:
        print(f"失败用户: {', '.join(results['errors'][:10])}")

    return results


# CSV 文件格式示例:
# username,first_name,last_name,department,title,groups
# zhangsan,三,张,Engineering,Developer,"VPN-Users,Dev-Team"
# lisi,四,李,HR,HR Specialist,HR-Team

方案二:PowerShell 桥接(无需 ldap3)

如果无法安装 ldap3,通过 PowerShell 的 ActiveDirectory 模块同样可以实现:

import subprocess
import json

class ADPowerShell:
    """通过 PowerShell 管理 AD"""

    def __init__(self):
        # 检查 ActiveDirectory 模块是否可用
        result = subprocess.run(
            ["powershell", "-Command",
             "Get-Module -ListAvailable ActiveDirectory"],
            capture_output=True, text=True
        )
        if not result.stdout.strip():
            print("⚠️ ActiveDirectory 模块未安装")
            print("请安装 RSAT (Remote Server Administration Tools)")

    def get_all_users(self, properties=None):
        """获取所有用户"""
        if properties is None:
            properties = "Name,SamAccountName,Department,Title,Enabled,LastLogonDate"

        ps_cmd = f'Get-ADUser -Filter * -Properties {properties} | Select-Object {properties} | ConvertTo-Json -Depth 3'

        result = subprocess.run(
            ["powershell", "-Command", ps_cmd],
            capture_output=True, text=True, timeout=60
        )

        try:
            data = json.loads(result.stdout)
            return data if isinstance(data, list) else [data]
        except json.JSONDecodeError:
            return []

    def create_user(self, username, display_name, ou="", department="", groups=None):
        """创建用户"""
        ou_part = f'-Path "{ou}"' if ou else ""

        cmd = f'''
        New-ADUser -Name "{display_name}" -SamAccountName "{username}" -UserPrincipalName "{username}@corp.example.com" -DisplayName "{display_name}" -Department "{department}" -AccountPassword (ConvertTo-SecureString "P@ssw0rd123!" -AsPlainText -Force) -Enabled $true {ou_part}
        '''

        result = subprocess.run(
            ["powershell", "-Command", cmd],
            capture_output=True, text=True
        )

        if result.returncode == 0:
            print(f"✓ 创建用户: {username}")

            # 加入组
            if groups:
                for group in groups.split(","):
                    self.add_to_group(username, group.strip())
            return True
        else:
            print(f"✗ 创建失败: {result.stderr}")
            return False

    def add_to_group(self, username, group_name):
        """将用户添加到组"""
        cmd = f'Add-ADGroupMember -Identity "{group_name}" -Members "{username}"'

        result = subprocess.run(
            ["powershell", "-Command", cmd],
            capture_output=True, text=True
        )

        if result.returncode == 0:
            print(f"  ✓ 加入组: {group_name}")
            return True
        print(f"  ✗ 加入组失败: {group_name}")
        return False

    def find_locked_users(self):
        """查找被锁定的用户"""
        ps_cmd = '''
        Search-ADAccount -LockedOut | Select-Object SamAccountName,Name,LastLogonDate,LockoutTime | ConvertTo-Json -Depth 3
        '''

        result = subprocess.run(
            ["powershell", "-Command", ps_cmd],
            capture_output=True, text=True
        )

        try:
            data = json.loads(result.stdout)
            users = data if isinstance(data, list) else [data]
            print(f"\n🔒 被锁定的用户 ({len(users)} 个):")
            for u in users:
                print(f"  {u['SamAccountName']} ({u['Name']})")
            return users
        except json.JSONDecodeError:
            print("无被锁定的用户")
            return []

    def find_inactive_users(self, days=90):
        """查找长时间未登录的用户"""
        ps_cmd = f'''
        Search-ADAccount -AccountInactive -TimeSpan {days} | Where-Object {{ $_.Enabled -eq $true }} | Select-Object SamAccountName,Name,LastLogonDate,DistinguishedName | ConvertTo-Json -Depth 3
        '''

        result = subprocess.run(
            ["powershell", "-Command", ps_cmd],
            capture_output=True, text=True, timeout=60
        )

        try:
            data = json.loads(result.stdout)
            users = data if isinstance(data, list) else [data]
            print(f"\n😴 超过 {days} 天未登录的用户 ({len(users)} 个):")
            for u in users:
                last_logon = u.get("LastLogonDate", "从未登录")
                print(f"  {u['SamAccountName']} | 最后登录: {last_logon}")
            return users
        except json.JSONDecodeError:
            return []

    def unlock_user(self, username):
        """解锁用户"""
        cmd = f'Unlock-ADAccount -Identity "{username}"'
        result = subprocess.run(
            ["powershell", "-Command", cmd],
            capture_output=True, text=True
        )
        if result.returncode == 0:
            print(f"✓ 已解锁: {username}")
            return True
        print(f"✗ 解锁失败: {username}")
        return False

    def reset_password(self, username, new_password):
        """重置密码"""
        cmd = f'''
        Set-ADAccountPassword -Identity "{username}" -Reset -NewPassword (ConvertTo-SecureString "{new_password}" -AsPlainText -Force)
        '''
        result = subprocess.run(
            ["powershell", "-Command", cmd],
            capture_output=True, text=True
        )
        if result.returncode == 0:
            print(f"✓ 密码已重置: {username}")
            # 强制下次登录修改密码
            self.conn = subprocess.run(
                ["powershell", "-Command", f'Set-ADUser -Identity "{username}" -ChangePasswordAtLogon $true'],
                capture_output=True, text=True
            )
            return True
        print(f"✗ 密码重置失败: {username}")
        return False


# 使用
# ad = ADPowerShell()
# ad.find_locked_users()
# ad.find_inactive_users(days=90)
# ad.unlock_user("zhangsan")
# ad.reset_password("zhangsan", "NewP@ssw0rd!")

实战:组策略自动化

def deploy_software_via_gpo(
    gpo_name,
    software_path,
    gpo_target_ou
):
    """
    通过组策略部署软件
    gpo_name: GPO 名称
    software_path: MSI 或 EXE 安装包路径
    gpo_target_ou: 目标 OU
    """
    ps_cmd = f'''
    # 创建新的 GPO
    $gpo = New-GPO -Name "{gpo_name}"

    # 配置软件安装
    # 方法:使用 GroupPolicy 模块
    Import-Module GroupPolicy

    # 将 GPO 链接到目标 OU
    New-GPLink -Name "{gpo_name}" -Target "{gpo_target_ou}" -LinkEnabled Yes

    Write-Output "GPO '{gpo_name}' 已创建并链接到 '{gpo_target_ou}'"
    '''

    result = subprocess.run(
        ["powershell", "-Command", ps_cmd],
        capture_output=True, text=True
    )

    print(result.stdout)
    if result.returncode == 0:
        print("✓ GPO 部署成功")
        return True
    print(f"✗ 部署失败: {result.stderr}")
    return False


def list_gpos():
    """列出所有组策略"""
    ps_cmd = 'Get-GPO -All | Select-Object DisplayName,Id,GpoStatus,CreationTime | ConvertTo-Json -Depth 3'

    result = subprocess.run(
        ["powershell", "-Command", ps_cmd],
        capture_output=True, text=True
    )

    try:
        data = json.loads(result.stdout)
        gpos = data if isinstance(data, list) else [data]
        print(f"\n组策略列表 ({len(gpos)} 个):\n")
        for gpo in gpos:
            status = gpo.get("GpoStatus", "")
            print(f"  {gpo['DisplayName']} | {status} | {gpo.get('CreationTime', '')[:10]}")
        return gpos
    except json.JSONDecodeError:
        return []

实战:计算机账户管理

def get_computer_inventory(ad_conn=None, ps_method=True):
    """获取域内计算机清单"""
    ps_cmd = '''
    Get-ADComputer -Filter * -Properties Name,OperatingSystem,OperatingSystemVersion,
        LastLogonDate,IPv4Address,Enabled |
    Select-Object Name,OperatingSystem,OperatingSystemVersion,
        LastLogonDate,IPv4Address,Enabled |
    ConvertTo-Json -Depth 3
    '''

    result = subprocess.run(
        ["powershell", "-Command", ps_cmd],
        capture_output=True, text=True, timeout=60
    )

    try:
        data = json.loads(result.stdout)
        computers = data if isinstance(data, list) else [data]

        print(f"\n域内计算机清单 ({len(computers)} 台):\n")
        print(f"{'计算机名':<20}{'操作系统':<25}{'最后登录':<12}{'IP地址':<16}{'状态'}")
        print("-" * 90)

        os_counter = Counter()
        for c in computers:
            name = c.get("Name", "")
            os_name = c.get("OperatingSystem", "")
            os_version = c.get("OperatingSystemVersion", "")
            os_counter[os_name] += 1
            last_logon = str(c.get("LastLogonDate", "从未"))[:10]
            ip = c.get("IPv4Address", "")
            enabled = "✓" if c.get("Enabled") else "✗"
            print(f"{name:<20}{os_name:<25}{last_logon:<12}{ip:<16}{enabled}")

        print(f"\n操作系统分布:")
        for os_name, count in os_counter.most_common():
            print(f"  {os_name}: {count} 台")

        return computers
    except json.JSONDecodeError:
        return []


from collections import Counter

# 使用
# get_computer_inventory()

实战:安全审计脚本

def security_audit():
    """AD 安全审计"""
    print("=" * 60)
    print("  Active Directory 安全审计报告")
    print("=" * 60)

    # 1. 域管理员账户
    print("\n=== 域管理员账户 ===")
    ps_cmd = 'Get-ADGroupMember -Identity "Domain Admins" | Select-Object Name,SamAccountName | ConvertTo-Json'
    result = subprocess.run(["powershell", "-Command", ps_cmd], capture_output=True, text=True)
    try:
        admins = json.loads(result.stdout)
        admins = admins if isinstance(admins, list) else [admins]
        print(f"域管理员数量: {len(admins)}")
        for a in admins:
            print(f"  - {a.get('SamAccountName', a.get('Name', 'Unknown'))}")
    except json.JSONDecodeError:
        pass

    # 2. 被锁定的账户
    print("\n=== 被锁定账户 ===")
    ps_cmd = 'Search-ADAccount -LockedOut | Select-Object SamAccountName | ConvertTo-Json'
    result = subprocess.run(["powershell", "-Command", ps_cmd], capture_output=True, text=True)
    try:
        locked = json.loads(result.stdout)
        locked = locked if isinstance(locked, list) else [locked]
        print(f"锁定账户: {len(locked)}")
        for a in locked:
            print(f"  - {a.get('SamAccountName')}")
    except json.JSONDecodeError:
        print("无锁定账户")

    # 3. 密码永不过期的账户
    print("\n=== 密码永不过期的账户 ===")
    ps_cmd = 'Get-ADUser -Filter * -Properties PasswordNeverExpires | Where-Object {$_.PasswordNeverExpires -eq $true -and $_.Enabled -eq $true} | Select-Object SamAccountName | ConvertTo-Json'
    result = subprocess.run(["powershell", "-Command", ps_cmd], capture_output=True, text=True, timeout=30)
    try:
        never_expire = json.loads(result.stdout)
        never_expire = never_expire if isinstance(never_expire, list) else [never_expire]
        print(f"永不过期账户: {len(never_expire)}")
        for a in never_expire[:20]:
            print(f"  - {a.get('SamAccountName')}")
        if len(never_expire) > 20:
            print(f"  ... 还有 {len(never_expire) - 20} 个")
    except json.JSONDecodeError:
        print("无(或查询失败)")

    # 4. 不活跃账户
    print("\n=== 90天未登录的活跃账户 ===")
    ps_cmd = 'Search-ADAccount -AccountInactive -TimeSpan 90 | Where-Object {$_.Enabled -eq $true} | Select-Object SamAccountName,LastLogonDate | ConvertTo-Json -Depth 3'
    result = subprocess.run(["powershell", "-Command", ps_cmd], capture_output=True, text=True, timeout=30)
    try:
        inactive = json.loads(result.stdout)
        inactive = inactive if isinstance(inactive, list) else [inactive]
        print(f"不活跃账户: {len(inactive)}")
        for a in inactive[:10]:
            print(f"  - {a.get('SamAccountName')} (最后登录: {a.get('LastLogonDate', '从未')})")
    except json.JSONDecodeError:
        print("无(或查询失败)")

    print("\n" + "=" * 60)


# 使用(需要管理员权限)
# security_audit()

小结

需求方案模块
查询用户ldap3 / PowerShellsearch_users / Get-ADUser
创建用户ldap3 / PowerShellconn.add / New-ADUser
重置密码ldap3 / PowerShellmodify_password / Set-ADAccountPassword
解锁账户PowerShellUnlock-ADAccount
组管理ldap3 / PowerShellMODIFY_ADD / Add-ADGroupMember
安全审计PowerShellSearch-ADAccount
软件部署GPONew-GPO + New-GPLink
批量导入CSV + 循环bulk_create_users_from_csv

Windows 域管理是 IT 运维的高阶技能。掌握了这些自动化方法,你可以在几分钟内完成原来需要数天的工作。

这个系列到目前为止已经覆盖了 Windows Python 运维的核心场景:

  1. Python vs Linux 运维的 5 个坑
  2. WMI + COM 实战手册
  3. 注册表自动化运维
  4. 事件日志分析
  5. 补丁管理
  6. 磁盘空间监控
  7. AD 域管理(本文)

到此这篇关于Python脚本实现自动化管理Windows域的文章就介绍到这了,更多相关Python管理Windows域内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文