python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python检查系统服务运行状态

Python系统监控之跨平台检查系统服务运行状态的完整指南

作者:零日失眠者

系统服务是操作系统和应用程序正常运行的基础,本文将和大家分享一个系统服务状态检查的Python实用脚本,可以跨平台检查系统服务的运行状态,并提供详细的报告和告警功能,有需要的可以参考下

简介

系统服务是操作系统和应用程序正常运行的基础。无论是Web服务器、数据库服务还是其他后台进程,服务的状态直接影响系统的可用性和稳定性。定期检查关键服务的运行状态,可以及时发现服务异常并采取相应措施。本文将介绍一个实用的Python脚本——系统服务状态检查工具,它可以跨平台检查系统服务的运行状态,并提供详细的报告和告警功能。

功能介绍

这个系统服务状态检查工具具有以下核心功能:

应用场景

这个工具适用于以下场景:

报错处理

脚本包含了完善的错误处理机制:

代码实现

import os
import sys
import subprocess
import argparse
import json
import time
from datetime import datetime
import platform
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart

class ServiceStatusChecker:
    def __init__(self):
        self.os_type = platform.system().lower()
        self.services = []
        self.results = []
        self.errors = []
        
    def add_service(self, name, display_name=None, description=None):
        """添加要检查的服务"""
        service = {
            'name': name,
            'display_name': display_name or name,
            'description': description or ''
        }
        self.services.append(service)
        
    def load_services_from_config(self, config_file):
        """从配置文件加载服务列表"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)
                services = config.get('services', [])
                for service in services:
                    self.add_service(
                        service['name'],
                        service.get('display_name'),
                        service.get('description')
                    )
            return True
        except Exception as e:
            self.errors.append(f"加载配置文件失败: {e}")
            return False
            
    def check_windows_service(self, service_name):
        """检查Windows服务状态"""
        try:
            # 使用sc query命令检查服务状态
            cmd = ['sc', 'query', service_name]
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
            
            if result.returncode != 0:
                return {
                    'name': service_name,
                    'status': 'unknown',
                    'error': result.stderr.strip()
                }
                
            output = result.stdout.lower()
            
            if 'running' in output:
                status = 'running'
            elif 'stopped' in output:
                status = 'stopped'
            else:
                status = 'unknown'
                
            return {
                'name': service_name,
                'status': status,
                'details': output
            }
        except subprocess.TimeoutExpired:
            return {
                'name': service_name,
                'status': 'timeout',
                'error': '命令执行超时'
            }
        except Exception as e:
            return {
                'name': service_name,
                'status': 'error',
                'error': str(e)
            }
            
    def check_linux_service(self, service_name):
        """检查Linux systemd服务状态"""
        try:
            # 使用systemctl is-active命令检查服务状态
            cmd = ['systemctl', 'is-active', service_name]
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
            
            if result.returncode == 0:
                status = result.stdout.strip()
            else:
                # 如果命令失败,可能是服务不存在
                status = 'inactive' if 'inactive' in result.stdout.lower() else 'unknown'
                
            # 获取服务详细信息
            details = {}
            try:
                cmd = ['systemctl', 'show', service_name, '--no-pager']
                detail_result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
                if detail_result.returncode == 0:
                    for line in detail_result.stdout.strip().split('\n'):
                        if '=' in line:
                            key, value = line.split('=', 1)
                            details[key] = value
            except:
                pass
                
            return {
                'name': service_name,
                'status': status,
                'details': details
            }
        except subprocess.TimeoutExpired:
            return {
                'name': service_name,
                'status': 'timeout',
                'error': '命令执行超时'
            }
        except Exception as e:
            return {
                'name': service_name,
                'status': 'error',
                'error': str(e)
            }
            
    def check_macos_service(self, service_name):
        """检查macOS launchd服务状态"""
        try:
            # 使用launchctl list命令检查服务状态
            cmd = ['launchctl', 'list']
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
            
            if result.returncode != 0:
                return {
                    'name': service_name,
                    'status': 'unknown',
                    'error': result.stderr.strip()
                }
                
            # 解析输出查找服务
            lines = result.stdout.strip().split('\n')[1:]  # 跳过标题行
            service_found = False
            pid = None
            
            for line in lines:
                parts = line.split()
                if len(parts) >= 3 and service_name in parts[2]:
                    service_found = True
                    pid = parts[0] if parts[0] != '-' else None
                    break
                    
            if service_found:
                status = 'running' if pid else 'loaded'
            else:
                status = 'not_found'
                
            return {
                'name': service_name,
                'status': status,
                'pid': pid
            }
        except subprocess.TimeoutExpired:
            return {
                'name': service_name,
                'status': 'timeout',
                'error': '命令执行超时'
            }
        except Exception as e:
            return {
                'name': service_name,
                'status': 'error',
                'error': str(e)
            }
            
    def check_service(self, service_name):
        """检查服务状态(跨平台)"""
        if self.os_type == 'windows':
            return self.check_windows_service(service_name)
        elif self.os_type == 'linux':
            return self.check_linux_service(service_name)
        elif self.os_type == 'darwin':  # macOS
            return self.check_macos_service(service_name)
        else:
            return {
                'name': service_name,
                'status': 'unsupported',
                'error': f'不支持的操作系统: {self.os_type}'
            }
            
    def check_all_services(self):
        """检查所有服务的状态"""
        print(f"开始检查服务状态 (操作系统: {self.os_type})")
        print("=" * 60)
        
        self.results = []
        
        for service in self.services:
            print(f"检查服务: {service['display_name']} ({service['name']})")
            result = self.check_service(service['name'])
            result['display_name'] = service['display_name']
            result['description'] = service['description']
            result['check_time'] = datetime.now().isoformat()
            self.results.append(result)
            
            # 显示结果
            status_display = {
                'running': '✅ 运行中',
                'stopped': '❌ 已停止',
                'inactive': '⏸️  未激活',
                'loaded': '🔄 已加载',
                'not_found': '❓ 未找到',
                'timeout': '⏰ 超时',
                'error': '⚠️  错误',
                'unknown': '❓ 未知'
            }
            
            status_text = status_display.get(result['status'], result['status'])
            print(f"  状态: {status_text}")
            
            if 'error' in result:
                print(f"  错误: {result['error']}")
                
            print()
            
        return self.results
        
    def generate_report(self):
        """生成服务状态报告"""
        report = []
        report.append("=" * 80)
        report.append("系统服务状态检查报告")
        report.append("=" * 80)
        report.append(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"操作系统: {self.os_type}")
        report.append(f"检查服务数: {len(self.services)}")
        report.append("")
        
        # 统计信息
        status_counts = {}
        for result in self.results:
            status = result['status']
            status_counts[status] = status_counts.get(status, 0) + 1
            
        report.append("状态统计:")
        for status, count in status_counts.items():
            report.append(f"  {status}: {count}")
        report.append("")
        
        # 详细结果
        report.append("服务详细信息:")
        report.append("-" * 50)
        for result in self.results:
            status_display = {
                'running': '✅ 运行中',
                'stopped': '❌ 已停止',
                'inactive': '⏸️  未激活',
                'loaded': '🔄 已加载',
                'not_found': '❓ 未找到',
                'timeout': '⏰ 超时',
                'error': '⚠️  错误',
                'unknown': '❓ 未知'
            }
            
            status_text = status_display.get(result['status'], result['status'])
            report.append(f"{result['display_name']} ({result['name']}): {status_text}")
            if result['description']:
                report.append(f"  描述: {result['description']}")
            if 'error' in result:
                report.append(f"  错误: {result['error']}")
            report.append("")
            
        # 错误信息
        if self.errors:
            report.append("错误信息:")
            report.append("-" * 50)
            for error in self.errors:
                report.append(f"  {error}")
            report.append("")
            
        report.append("=" * 80)
        return "\n".join(report)
        
    def save_report(self, filename):
        """保存报告到文件"""
        try:
            report = self.generate_report()
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(report)
            print(f"报告已保存到: {filename}")
            return True
        except Exception as e:
            print(f"保存报告时出错: {e}")
            return False
            
    def save_json_report(self, filename):
        """保存JSON格式报告"""
        try:
            report_data = {
                'check_time': datetime.now().isoformat(),
                'os_type': self.os_type,
                'services': self.results,
                'errors': self.errors
            }
            
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(report_data, f, indent=2, ensure_ascii=False)
            print(f"JSON报告已保存到: {filename}")
            return True
        except Exception as e:
            print(f"保存JSON报告时出错: {e}")
            return False
            
    def send_alert(self, smtp_config, alert_services=None):
        """发送告警邮件"""
        try:
            # 确定需要告警的服务
            if alert_services is None:
                # 默认告警停止或异常的服务
                alert_services = [
                    r for r in self.results 
                    if r['status'] not in ['running', 'loaded']
                ]
                
            if not alert_services:
                print("没有需要告警的服务")
                return True
                
            # 构建邮件内容
            subject = f"服务状态告警 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
            body = "以下服务状态异常:\n\n"
            
            for service in alert_services:
                body += f"服务: {service['display_name']} ({service['name']})\n"
                body += f"状态: {service['status']}\n"
                if 'error' in service:
                    body += f"错误: {service['error']}\n"
                body += "\n"
                
            # 发送邮件
            msg = MimeMultipart()
            msg['From'] = smtp_config['from']
            msg['To'] = smtp_config['to']
            msg['Subject'] = subject
            
            msg.attach(MimeText(body, 'plain', 'utf-8'))
            
            server = smtplib.SMTP(smtp_config['server'], smtp_config['port'])
            server.starttls()
            server.login(smtp_config['username'], smtp_config['password'])
            server.send_message(msg)
            server.quit()
            
            print("告警邮件已发送")
            return True
            
        except Exception as e:
            print(f"发送告警邮件时出错: {e}")
            return False
            
    def continuous_monitor(self, interval=60, max_checks=None):
        """持续监控服务状态"""
        check_count = 0
        
        try:
            while True:
                print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始第 {check_count + 1} 次检查")
                self.check_all_services()
                
                # 显示简要状态
                running_count = len([r for r in self.results if r['status'] == 'running'])
                print(f"运行中服务: {running_count}/{len(self.results)}")
                
                check_count += 1
                
                # 检查是否达到最大检查次数
                if max_checks and check_count >= max_checks:
                    break
                    
                # 等待下次检查
                time.sleep(interval)
                
        except KeyboardInterrupt:
            print("\n\n停止监控...")

def create_sample_config():
    """创建示例配置文件"""
    config = {
        "services": [
            {
                "name": "ssh",
                "display_name": "SSH服务",
                "description": "安全Shell服务"
            },
            {
                "name": "nginx",
                "display_name": "Nginx Web服务器",
                "description": "高性能HTTP和反向代理服务器"
            },
            {
                "name": "mysql",
                "display_name": "MySQL数据库",
                "description": "关系型数据库管理系统"
            }
        ]
    }
    
    try:
        with open('service_config.json', 'w', encoding='utf-8') as f:
            json.dump(config, f, indent=2, ensure_ascii=False)
        print("示例配置文件已创建: service_config.json")
        return True
    except Exception as e:
        print(f"创建示例配置文件时出错: {e}")
        return False

def main():
    parser = argparse.ArgumentParser(description="系统服务状态检查工具")
    parser.add_argument("-c", "--config", help="配置文件路径")
    parser.add_argument("-s", "--service", action='append', help="要检查的服务名称(可多次使用)")
    parser.add_argument("-o", "--output", help="保存报告到文件")
    parser.add_argument("-j", "--json", help="保存JSON报告到文件")
    parser.add_argument("-m", "--monitor", type=int, help="持续监控间隔(秒)")
    parser.add_argument("--max-checks", type=int, help="最大检查次数")
    parser.add_argument("--create-config", action="store_true", help="创建示例配置文件")
    parser.add_argument("--smtp-server", help="SMTP服务器地址")
    parser.add_argument("--smtp-port", type=int, default=587, help="SMTP服务器端口")
    parser.add_argument("--smtp-username", help="SMTP用户名")
    parser.add_argument("--smtp-password", help="SMTP密码")
    parser.add_argument("--smtp-from", help="发件人邮箱")
    parser.add_argument("--smtp-to", help="收件人邮箱")
    
    args = parser.parse_args()
    
    # 创建示例配置文件
    if args.create_config:
        create_sample_config()
        return
        
    try:
        checker = ServiceStatusChecker()
        
        # 加载服务列表
        if args.config:
            if not checker.load_services_from_config(args.config):
                print("加载配置文件失败")
                sys.exit(1)
        elif args.service:
            for service_name in args.service:
                checker.add_service(service_name)
        else:
            print("请指定配置文件或服务名称")
            sys.exit(1)
            
        # 检查服务状态
        if args.monitor:
            # 持续监控模式
            checker.continuous_monitor(args.monitor, args.max_checks)
        else:
            # 单次检查模式
            checker.check_all_services()
            
            # 生成报告
            if args.output:
                checker.save_report(args.output)
            elif args.json:
                checker.save_json_report(args.json)
            else:
                print(checker.generate_report())
                
            # 发送告警邮件
            if args.smtp_server and args.smtp_username and args.smtp_password:
                smtp_config = {
                    'server': args.smtp_server,
                    'port': args.smtp_port,
                    'username': args.smtp_username,
                    'password': args.smtp_password,
                    'from': args.smtp_from or args.smtp_username,
                    'to': args.smtp_to
                }
                checker.send_alert(smtp_config)
                
    except KeyboardInterrupt:
        print("\n\n用户中断操作")
    except Exception as e:
        print(f"程序执行出错: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

使用方法

基本使用

# 创建示例配置文件
python service_checker.py --create-config

# 使用配置文件检查服务
python service_checker.py -c service_config.json

# 检查单个服务
python service_checker.py -s nginx

# 检查多个服务
python service_checker.py -s nginx -s mysql -s ssh

# 保存报告到文件
python service_checker.py -c service_config.json -o report.txt

# 保存JSON报告
python service_checker.py -c service_config.json -j report.json

# 持续监控服务状态(每30秒检查一次)
python service_checker.py -c service_config.json -m 30

# 持续监控最多10次
python service_checker.py -c service_config.json -m 30 --max-checks 10

告警功能

# 发送告警邮件
python service_checker.py -c service_config.json \
  --smtp-server smtp.gmail.com \
  --smtp-username your_email@gmail.com \
  --smtp-password your_password \
  --smtp-to admin@example.com

配置文件示例

创建的示例配置文件 service_config.json 内容如下:

{
  "services": [
    {
      "name": "ssh",
      "display_name": "SSH服务",
      "description": "安全Shell服务"
    },
    {
      "name": "nginx",
      "display_name": "Nginx Web服务器",
      "description": "高性能HTTP和反向代理服务器"
    },
    {
      "name": "mysql",
      "display_name": "MySQL数据库",
      "description": "关系型数据库管理系统"
    }
  ]
}

命令行参数说明

使用示例

基本检查

python service_checker.py -c service_config.json

持续监控

python service_checker.py -c service_config.json -m 60 --max-checks 10 -o status.log

带告警的检查

python service_checker.py -c service_config.json \
  --smtp-server smtp.gmail.com \
  --smtp-username admin@example.com \
  --smtp-password app_password \
  --smtp-to ops-team@example.com

总结

这个系统服务状态检查工具提供了一个跨平台的服务监控解决方案,能够检查Windows、Linux和macOS系统上的服务状态。它支持配置文件管理、批量检查、持续监控和告警通知等功能,适用于服务器监控、故障排查和运维自动化等多种场景。通过这个工具,用户可以及时了解关键服务的运行状态,在服务异常时快速响应,确保系统的稳定运行。

以上就是Python系统监控之跨平台检查系统服务运行状态的完整指南的详细内容,更多关于Python检查系统服务运行状态的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文