Python脚本实现自动化管理Windows域
作者:红魔Y
这篇文章主要为大家详细介绍了如何通过Python 的 ldap3 和 PowerShell 实现域用户批量管理,组策略自动推送等操作,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下
新员工入职,IT 部门要在 AD 里创建账号、加到对应的组、分配 OU、推送组策略、安装软件……手动操作一个员工 20 分钟,100 个新人就是 33 小时。用 Python?脚本跑一遍,5 分钟。
为什么需要自动化域管理?
Active Directory (AD) 是企业 IT 基础设施的核心,几乎所有管理工作都围绕它展开:
| 操作 | 手动方式 | Python 自动化 |
|---|---|---|
| 创建用户 | ADUC 里右键→新建→填表 | 一行脚本 |
| 重置密码 | 找到用户→右键→重置→输入 | 批量脚本 |
| 查找锁定账户 | 搜索→筛选→逐个查看 | 一键列出 |
| 创建计算机账户 | ADUC→ Computers→新建 | 批量导入 |
| 推送组策略 | GPMC→找到策略→链接 | 脚本部署 |
| 软件部署 | 组策略→软件安装→配置 | SCCM/PS 脚本 |
方案一:ldap3 库(纯 Python 方案)
ldap3 是一个纯 Python 实现的 LDAP 客户端,不需要安装任何 C 扩展。
安装
pip install ldap3
连接 AD 域控制器
from ldap3 import Server, Connection, ALL, SUBTREE, MODIFY_ADD, MODIFY_REPLACE
from ldap3.core.exceptions import LDAPException
class ADManager:
"""Active Directory 域管理器"""
def __init__(self, domain, dc_host, username, password, use_ssl=True):
"""
domain: 例如 "corp.example.com"
dc_host: 域控制器地址,例如 "dc01.corp.example.com"
username: 管理员账号,例如 "admin@corp.example.com"
password: 密码
"""
self.domain = domain
self.server = Server(
dc_host,
use_ssl=use_ssl,
get_info=ALL,
)
self.conn = Connection(
self.server,
user=username,
password=password,
authentication="NTLM",
auto_bind=True,
)
if self.conn.bind():
print(f"✓ 已连接到域控制器: {dc_host}")
else:
raise LDAPException(f"连接失败: {self.conn.last_error}")
# 基础 DN
self.base_dn = ",".join(
f"DC={part}" for part in domain.split(".")
)
def search_users(self, filter_expr=None, attributes=None):
"""
搜索域用户
filter_expr: LDAP 过滤表达式
attributes: 返回的属性列表
"""
if filter_expr is None:
filter_expr = "(objectClass=user)"
if attributes is None:
attributes = [
"cn", "sAMAccountName", "mail",
"department", "title", "whenCreated",
"lastLogon", "userAccountControl",
"memberOf", "distinguishedName",
]
self.conn.search(
search_base=self.base_dn,
search_filter=filter_expr,
search_scope=SUBTREE,
attributes=attributes,
)
users = []
for entry in self.conn.entries:
user = {}
for attr in attributes:
try:
value = getattr(entry, attr, None)
user[attr] = str(value.value) if value else ""
except Exception:
user[attr] = ""
users.append(user)
return users
def close(self):
"""关闭连接"""
self.conn.unbind()
# 使用
ad = ADManager(
domain="corp.example.com",
dc_host="dc01.corp.example.com",
username="admin@corp.example.com",
password="YourPassword"
)
# 搜索所有用户
users = ad.search_users()
print(f"域中共有 {len(users)} 个用户\n")
# 搜索特定部门
dev_users = ad.search_users(
filter_expr="(&(objectClass=user)(department=Engineering))"
)
print(f"Engineering 部门: {len(dev_users)} 人")
ad.close()
用户管理 CRUD
class ADUserManager(ADManager):
"""AD 用户管理扩展"""
def create_user(
self,
username,
first_name,
last_name,
password,
ou_path=None,
department="",
title="",
email="",
groups=None
):
"""
创建域用户
username: 登录名 (sAMAccountName)
ou_path: OU 路径,例如 "OU=Users,OU=Shanghai,DC=corp,DC=example,DC=com"
"""
if ou_path is None:
ou_path = f"CN=Users,{self.base_dn}"
dn = f"CN={username},{ou_path}"
display_name = f"{first_name} {last_name}"
# UPN (User Principal Name)
upn = f"{username}@{self.domain}"
# 默认密码需要符合策略
default_password = password or "P@ssw0rd123!"
try:
self.conn.add(
dn,
attributes={
"objectClass": ["top", "person", "organizationalPerson", "user"],
"cn": username,
"sAMAccountName": username,
"userPrincipalName": upn,
"displayName": display_name,
"givenName": first_name,
"sn": last_name,
"mail": email or f"{username}@{self.domain}",
"department": department,
"title": title,
"userAccountControl": 512, # 512 = 正常启用
}
)
if self.conn.result["result"] == 0:
print(f"✓ 用户创建成功: {username}")
# 设置密码
self._set_password(dn, default_password)
# 加入组
if groups:
for group in groups:
self.add_to_group(username, group)
return True
else:
print(f"✗ 创建失败: {self.conn.result['description']}")
return False
except LDAPException as e:
print(f"✗ 创建失败: {e}")
return False
def _set_password(self, user_dn, new_password):
"""设置用户密码"""
try:
# 注意:设置密码需要 SSL/TLS 连接
self.conn.extend.microsoft.modify_password(
user_dn, new_password
)
print(f" 密码已设置")
return True
except Exception as e:
print(f" 密码设置失败: {e}")
return False
def reset_password(self, username, new_password):
"""重置用户密码"""
user_dn = self._find_user_dn(username)
if user_dn:
return self._set_password(user_dn, new_password)
return False
def _find_user_dn(self, username):
"""根据用户名查找 DN"""
self.conn.search(
self.base_dn,
f"(sAMAccountName={username})",
attributes=["distinguishedName"],
)
if self.conn.entries:
return str(self.conn.entries[0].distinguishedName)
return None
def disable_user(self, username):
"""禁用用户账户"""
user_dn = self._find_user_dn(username)
if not user_dn:
print(f"用户不存在: {username}")
return False
# 514 = 禁用
self.conn.modify(
user_dn,
{"userAccountControl": [(MODIFY_REPLACE, [514])]}
)
if self.conn.result["result"] == 0:
print(f"✓ 已禁用: {username}")
return True
return False
def enable_user(self, username):
"""启用用户账户"""
user_dn = self._find_user_dn(username)
if not user_dn:
return False
# 512 = 正常启用
self.conn.modify(
user_dn,
{"userAccountControl": [(MODIFY_REPLACE, [512])]}
)
if self.conn.result["result"] == 0:
print(f"✓ 已启用: {username}")
return True
return False
def add_to_group(self, username, group_name):
"""将用户添加到组"""
user_dn = self._find_user_dn(username)
if not user_dn:
return False
group_dn = self._find_group_dn(group_name)
if not group_dn:
print(f"组不存在: {group_name}")
return False
self.conn.modify(
group_dn,
{"member": [(MODIFY_ADD, [user_dn])]}
)
if self.conn.result["result"] == 0:
print(f" ✓ {username} 已加入 {group_name}")
return True
print(f" ✗ 加入组失败: {self.conn.result['description']}")
return False
def _find_group_dn(self, group_name):
"""根据组名查找 DN"""
self.conn.search(
self.base_dn,
f"(&(objectClass=group)(cn={group_name}))",
attributes=["distinguishedName"],
)
if self.conn.entries:
return str(self.conn.entries[0].distinguishedName)
return None
# 使用示例
# mgr = ADUserManager("corp.example.com", "dc01.corp.example.com", ...)
#
# 批量创建用户
# new_users = [
# {"username": "zhangsan", "first_name": "三", "last_name": "张", "department": "Engineering", "groups": ["VPN-Users", "Dev-Team"]},
# {"username": "lisi", "first_name": "四", "last_name": "李", "department": "HR", "groups": ["HR-Team"]},
# ]
# for u in new_users:
# mgr.create_user(**u)
批量操作:从 CSV 导入
import csv
def bulk_create_users_from_csv(ad_manager, csv_file, default_password="P@ssw0rd123!"):
"""从 CSV 文件批量创建用户"""
with open(csv_file, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
results = {"success": 0, "failed": 0, "errors": []}
for row in reader:
username = row.get("username", "").strip()
if not username:
continue
groups = [g.strip() for g in row.get("groups", "").split(",") if g.strip()]
success = ad_manager.create_user(
username=username,
first_name=row.get("first_name", ""),
last_name=row.get("last_name", ""),
password=default_password,
department=row.get("department", ""),
title=row.get("title", ""),
groups=groups if groups else None,
)
if success:
results["success"] += 1
else:
results["failed"] += 1
results["errors"].append(username)
print(f"\n批量创建完成: {results['success']} 成功, {results['failed']} 失败")
if results["errors"]:
print(f"失败用户: {', '.join(results['errors'][:10])}")
return results
# CSV 文件格式示例:
# username,first_name,last_name,department,title,groups
# zhangsan,三,张,Engineering,Developer,"VPN-Users,Dev-Team"
# lisi,四,李,HR,HR Specialist,HR-Team
方案二:PowerShell 桥接(无需 ldap3)
如果无法安装 ldap3,通过 PowerShell 的 ActiveDirectory 模块同样可以实现:
import subprocess
import json
class ADPowerShell:
"""通过 PowerShell 管理 AD"""
def __init__(self):
# 检查 ActiveDirectory 模块是否可用
result = subprocess.run(
["powershell", "-Command",
"Get-Module -ListAvailable ActiveDirectory"],
capture_output=True, text=True
)
if not result.stdout.strip():
print("⚠️ ActiveDirectory 模块未安装")
print("请安装 RSAT (Remote Server Administration Tools)")
def get_all_users(self, properties=None):
"""获取所有用户"""
if properties is None:
properties = "Name,SamAccountName,Department,Title,Enabled,LastLogonDate"
ps_cmd = f'Get-ADUser -Filter * -Properties {properties} | Select-Object {properties} | ConvertTo-Json -Depth 3'
result = subprocess.run(
["powershell", "-Command", ps_cmd],
capture_output=True, text=True, timeout=60
)
try:
data = json.loads(result.stdout)
return data if isinstance(data, list) else [data]
except json.JSONDecodeError:
return []
def create_user(self, username, display_name, ou="", department="", groups=None):
"""创建用户"""
ou_part = f'-Path "{ou}"' if ou else ""
cmd = f'''
New-ADUser -Name "{display_name}" -SamAccountName "{username}" -UserPrincipalName "{username}@corp.example.com" -DisplayName "{display_name}" -Department "{department}" -AccountPassword (ConvertTo-SecureString "P@ssw0rd123!" -AsPlainText -Force) -Enabled $true {ou_part}
'''
result = subprocess.run(
["powershell", "-Command", cmd],
capture_output=True, text=True
)
if result.returncode == 0:
print(f"✓ 创建用户: {username}")
# 加入组
if groups:
for group in groups.split(","):
self.add_to_group(username, group.strip())
return True
else:
print(f"✗ 创建失败: {result.stderr}")
return False
def add_to_group(self, username, group_name):
"""将用户添加到组"""
cmd = f'Add-ADGroupMember -Identity "{group_name}" -Members "{username}"'
result = subprocess.run(
["powershell", "-Command", cmd],
capture_output=True, text=True
)
if result.returncode == 0:
print(f" ✓ 加入组: {group_name}")
return True
print(f" ✗ 加入组失败: {group_name}")
return False
def find_locked_users(self):
"""查找被锁定的用户"""
ps_cmd = '''
Search-ADAccount -LockedOut | Select-Object SamAccountName,Name,LastLogonDate,LockoutTime | ConvertTo-Json -Depth 3
'''
result = subprocess.run(
["powershell", "-Command", ps_cmd],
capture_output=True, text=True
)
try:
data = json.loads(result.stdout)
users = data if isinstance(data, list) else [data]
print(f"\n🔒 被锁定的用户 ({len(users)} 个):")
for u in users:
print(f" {u['SamAccountName']} ({u['Name']})")
return users
except json.JSONDecodeError:
print("无被锁定的用户")
return []
def find_inactive_users(self, days=90):
"""查找长时间未登录的用户"""
ps_cmd = f'''
Search-ADAccount -AccountInactive -TimeSpan {days} | Where-Object {{ $_.Enabled -eq $true }} | Select-Object SamAccountName,Name,LastLogonDate,DistinguishedName | ConvertTo-Json -Depth 3
'''
result = subprocess.run(
["powershell", "-Command", ps_cmd],
capture_output=True, text=True, timeout=60
)
try:
data = json.loads(result.stdout)
users = data if isinstance(data, list) else [data]
print(f"\n😴 超过 {days} 天未登录的用户 ({len(users)} 个):")
for u in users:
last_logon = u.get("LastLogonDate", "从未登录")
print(f" {u['SamAccountName']} | 最后登录: {last_logon}")
return users
except json.JSONDecodeError:
return []
def unlock_user(self, username):
"""解锁用户"""
cmd = f'Unlock-ADAccount -Identity "{username}"'
result = subprocess.run(
["powershell", "-Command", cmd],
capture_output=True, text=True
)
if result.returncode == 0:
print(f"✓ 已解锁: {username}")
return True
print(f"✗ 解锁失败: {username}")
return False
def reset_password(self, username, new_password):
"""重置密码"""
cmd = f'''
Set-ADAccountPassword -Identity "{username}" -Reset -NewPassword (ConvertTo-SecureString "{new_password}" -AsPlainText -Force)
'''
result = subprocess.run(
["powershell", "-Command", cmd],
capture_output=True, text=True
)
if result.returncode == 0:
print(f"✓ 密码已重置: {username}")
# 强制下次登录修改密码
self.conn = subprocess.run(
["powershell", "-Command", f'Set-ADUser -Identity "{username}" -ChangePasswordAtLogon $true'],
capture_output=True, text=True
)
return True
print(f"✗ 密码重置失败: {username}")
return False
# 使用
# ad = ADPowerShell()
# ad.find_locked_users()
# ad.find_inactive_users(days=90)
# ad.unlock_user("zhangsan")
# ad.reset_password("zhangsan", "NewP@ssw0rd!")
实战:组策略自动化
def deploy_software_via_gpo(
gpo_name,
software_path,
gpo_target_ou
):
"""
通过组策略部署软件
gpo_name: GPO 名称
software_path: MSI 或 EXE 安装包路径
gpo_target_ou: 目标 OU
"""
ps_cmd = f'''
# 创建新的 GPO
$gpo = New-GPO -Name "{gpo_name}"
# 配置软件安装
# 方法:使用 GroupPolicy 模块
Import-Module GroupPolicy
# 将 GPO 链接到目标 OU
New-GPLink -Name "{gpo_name}" -Target "{gpo_target_ou}" -LinkEnabled Yes
Write-Output "GPO '{gpo_name}' 已创建并链接到 '{gpo_target_ou}'"
'''
result = subprocess.run(
["powershell", "-Command", ps_cmd],
capture_output=True, text=True
)
print(result.stdout)
if result.returncode == 0:
print("✓ GPO 部署成功")
return True
print(f"✗ 部署失败: {result.stderr}")
return False
def list_gpos():
"""列出所有组策略"""
ps_cmd = 'Get-GPO -All | Select-Object DisplayName,Id,GpoStatus,CreationTime | ConvertTo-Json -Depth 3'
result = subprocess.run(
["powershell", "-Command", ps_cmd],
capture_output=True, text=True
)
try:
data = json.loads(result.stdout)
gpos = data if isinstance(data, list) else [data]
print(f"\n组策略列表 ({len(gpos)} 个):\n")
for gpo in gpos:
status = gpo.get("GpoStatus", "")
print(f" {gpo['DisplayName']} | {status} | {gpo.get('CreationTime', '')[:10]}")
return gpos
except json.JSONDecodeError:
return []
实战:计算机账户管理
def get_computer_inventory(ad_conn=None, ps_method=True):
"""获取域内计算机清单"""
ps_cmd = '''
Get-ADComputer -Filter * -Properties Name,OperatingSystem,OperatingSystemVersion,
LastLogonDate,IPv4Address,Enabled |
Select-Object Name,OperatingSystem,OperatingSystemVersion,
LastLogonDate,IPv4Address,Enabled |
ConvertTo-Json -Depth 3
'''
result = subprocess.run(
["powershell", "-Command", ps_cmd],
capture_output=True, text=True, timeout=60
)
try:
data = json.loads(result.stdout)
computers = data if isinstance(data, list) else [data]
print(f"\n域内计算机清单 ({len(computers)} 台):\n")
print(f"{'计算机名':<20}{'操作系统':<25}{'最后登录':<12}{'IP地址':<16}{'状态'}")
print("-" * 90)
os_counter = Counter()
for c in computers:
name = c.get("Name", "")
os_name = c.get("OperatingSystem", "")
os_version = c.get("OperatingSystemVersion", "")
os_counter[os_name] += 1
last_logon = str(c.get("LastLogonDate", "从未"))[:10]
ip = c.get("IPv4Address", "")
enabled = "✓" if c.get("Enabled") else "✗"
print(f"{name:<20}{os_name:<25}{last_logon:<12}{ip:<16}{enabled}")
print(f"\n操作系统分布:")
for os_name, count in os_counter.most_common():
print(f" {os_name}: {count} 台")
return computers
except json.JSONDecodeError:
return []
from collections import Counter
# 使用
# get_computer_inventory()
实战:安全审计脚本
def security_audit():
"""AD 安全审计"""
print("=" * 60)
print(" Active Directory 安全审计报告")
print("=" * 60)
# 1. 域管理员账户
print("\n=== 域管理员账户 ===")
ps_cmd = 'Get-ADGroupMember -Identity "Domain Admins" | Select-Object Name,SamAccountName | ConvertTo-Json'
result = subprocess.run(["powershell", "-Command", ps_cmd], capture_output=True, text=True)
try:
admins = json.loads(result.stdout)
admins = admins if isinstance(admins, list) else [admins]
print(f"域管理员数量: {len(admins)}")
for a in admins:
print(f" - {a.get('SamAccountName', a.get('Name', 'Unknown'))}")
except json.JSONDecodeError:
pass
# 2. 被锁定的账户
print("\n=== 被锁定账户 ===")
ps_cmd = 'Search-ADAccount -LockedOut | Select-Object SamAccountName | ConvertTo-Json'
result = subprocess.run(["powershell", "-Command", ps_cmd], capture_output=True, text=True)
try:
locked = json.loads(result.stdout)
locked = locked if isinstance(locked, list) else [locked]
print(f"锁定账户: {len(locked)}")
for a in locked:
print(f" - {a.get('SamAccountName')}")
except json.JSONDecodeError:
print("无锁定账户")
# 3. 密码永不过期的账户
print("\n=== 密码永不过期的账户 ===")
ps_cmd = 'Get-ADUser -Filter * -Properties PasswordNeverExpires | Where-Object {$_.PasswordNeverExpires -eq $true -and $_.Enabled -eq $true} | Select-Object SamAccountName | ConvertTo-Json'
result = subprocess.run(["powershell", "-Command", ps_cmd], capture_output=True, text=True, timeout=30)
try:
never_expire = json.loads(result.stdout)
never_expire = never_expire if isinstance(never_expire, list) else [never_expire]
print(f"永不过期账户: {len(never_expire)}")
for a in never_expire[:20]:
print(f" - {a.get('SamAccountName')}")
if len(never_expire) > 20:
print(f" ... 还有 {len(never_expire) - 20} 个")
except json.JSONDecodeError:
print("无(或查询失败)")
# 4. 不活跃账户
print("\n=== 90天未登录的活跃账户 ===")
ps_cmd = 'Search-ADAccount -AccountInactive -TimeSpan 90 | Where-Object {$_.Enabled -eq $true} | Select-Object SamAccountName,LastLogonDate | ConvertTo-Json -Depth 3'
result = subprocess.run(["powershell", "-Command", ps_cmd], capture_output=True, text=True, timeout=30)
try:
inactive = json.loads(result.stdout)
inactive = inactive if isinstance(inactive, list) else [inactive]
print(f"不活跃账户: {len(inactive)}")
for a in inactive[:10]:
print(f" - {a.get('SamAccountName')} (最后登录: {a.get('LastLogonDate', '从未')})")
except json.JSONDecodeError:
print("无(或查询失败)")
print("\n" + "=" * 60)
# 使用(需要管理员权限)
# security_audit()
小结
| 需求 | 方案 | 模块 |
|---|---|---|
| 查询用户 | ldap3 / PowerShell | search_users / Get-ADUser |
| 创建用户 | ldap3 / PowerShell | conn.add / New-ADUser |
| 重置密码 | ldap3 / PowerShell | modify_password / Set-ADAccountPassword |
| 解锁账户 | PowerShell | Unlock-ADAccount |
| 组管理 | ldap3 / PowerShell | MODIFY_ADD / Add-ADGroupMember |
| 安全审计 | PowerShell | Search-ADAccount |
| 软件部署 | GPO | New-GPO + New-GPLink |
| 批量导入 | CSV + 循环 | bulk_create_users_from_csv |
Windows 域管理是 IT 运维的高阶技能。掌握了这些自动化方法,你可以在几分钟内完成原来需要数天的工作。
这个系列到目前为止已经覆盖了 Windows Python 运维的核心场景:
- Python vs Linux 运维的 5 个坑
- WMI + COM 实战手册
- 注册表自动化运维
- 事件日志分析
- 补丁管理
- 磁盘空间监控
- AD 域管理(本文)
到此这篇关于Python脚本实现自动化管理Windows域的文章就介绍到这了,更多相关Python管理Windows域内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
