Python实现局域网远程控制电脑
作者:hvinsion
这篇文章主要为大家详细介绍了如何利用Python编写一个工具,可以实现远程控制局域网电脑关机,重启,注销等功能,感兴趣的小伙伴可以参考一下
1.简介
一款由Python可以远程控制局域网电脑关机、重启、注销、锁定 、休眠、退出登录甚至能操作远程电脑Cmd终端命令的一款工具。资源及源码已打包,大家可自行下载。
工具分为1.0以及2.0版本,2.0版本在1.0终端命令行模式更改为网页界面化操作,可利用手机等多终端设备控制,更加美观实用;优化了端口设置,添加了条件判断;可结合“Radmin_LAN”软件,实现异地控制。工具可用Pyinstaller打包成无窗口后台静默运行。
默认账号:root
默认密码:123456
2. 运行效果
1.0版本运行效果:
2.0版本运行效果:
PC端页面效果:
服务端页面:
客户端登录页面:
客户端操作页面:
手机移动端展示效果:
3. 1.0版本相关源码
服务端server.py
import socket import keyboard import subprocess import threading import logging import queue ''' import win32gui import win32con # 获取当前窗口句柄 hwnd = win32gui.GetForegroundWindow() # 设置窗口属性,隐藏窗口 win32gui.ShowWindow(hwnd, win32con.SW_HIDE) ''' # 配置日志 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') print("客户端登录账号:root,密码:123456,F3关闭服务器") print() ip1 = '192.168.137.1' # 默认ip地址 print(f"请输入服务器IP地址(电脑的IPv4地址),不输入默认为 {ip1}") # 超时设置,15秒 def get_input(prompt, timeout, result_queue): try: user_input = input(prompt) if user_input.strip() == "": result_queue.put(None) # 如果用户直接回车,返回None else: result_queue.put(user_input) except Exception as e: logging.error(f"输入过程中发生错误: {e}") # 创建一个线程来获取输入 print("超时时间---15秒") print() result_queue = queue.Queue() input_thread = threading.Thread(target=get_input, args=("请输入服务器IP地址:", 15, result_queue)) input_thread.start() # 等待输入,如果超时则返回默认值 input_thread.join(15) if not result_queue.empty(): ip = result_queue.get() if ip is None: ip = ip1 # 如果用户直接回车,使用默认值 else: ip = ip1 print(f"最终IP地址: {ip}") ''' def get_ipv4_address(): hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) return ip_address ip=get_ipv4_address() print(ip) ''' # 创建一个 TCP/IP socket server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置 SO_REUSEADDR 选项 server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 绑定服务器地址和端口 server_address = (ip, 5000) server_socket.bind(server_address) # 监听连接 server_socket.listen(5) # 支持最多5个并发连接 print('服务器正在等待连接...') def close_server(): print('服务器已关闭') server_socket.close() exit() # 监听 F3 键关闭服务器 keyboard.add_hotkey('F3', close_server) def handle_client(client_socket, client_address): try: print('连接来自:', client_address) # 接收账号和密码 data = client_socket.recv(1024) logging.debug(f"接收到的数据: {data.decode()}") # 解析账号和密码 try: account, password = data.decode().split(':') except ValueError: logging.error("账号和密码格式错误") message = '账号和密码格式错误!' client_socket.send(message.encode()) return # 验证账号和密码 if account == 'root' and password == '123456': # 发送成功消息 message = '登录成功!' client_socket.send(message.encode()) # 接收客户端请求 while True: request = client_socket.recv(1024).decode() logging.debug(f"接收到的请求: {request}") if request == '1': # 关机 subprocess.call('shutdown /s /t 0', shell=True) elif request == '2': # 重启 subprocess.call('shutdown /r /t 0', shell=True) elif request == '3': # 休眠 subprocess.call('rundll32.exe powrprof.dll,SetSuspendState 0,1,0', shell=True) elif request == '4': # 锁定 subprocess.call('rundll32.exe user32.dll,LockWorkStation', shell=True) elif request == '0': # 退出登录 print(client_address, "退出登录") break elif request == '5': # 等待用户输入 user_input = 1 while True: # 执行cmd命令 command = client_socket.recv(1024).decode() logging.debug(f"接收到的命令:{command}") try: # 执行命令 result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # 获取命令的输出 output, error = result.communicate() # 打印命令的输出到本地控制台 logging.debug(f"{output}") # 发送命令的输出 if error: output = " " client_socket.sendall(output.encode()) logging.debug(f"{error}") # 发送命令的错误 client_socket.sendall(error.encode()) logging.debug("命令结果已发送") except subprocess.CalledProcessError as e: logging.error(f"{e.stderr}") # 发送错误信息 client_socket.sendall(str(e).encode()) finally: # 发送一个结束标记 client_socket.sendall(' '.encode()) # 等待用户输入 #接收客户端的user_input值 user_input = client_socket.recv(1024).decode() logging.debug(f"接收到的请求: {user_input}") if user_input == '1': # 继续执行命令 continue elif user_input == '2': print(client_address, "退出登录") # 退出循环 break else: # 无效输入 client_socket.sendall('无效输入'.encode()) # 继续执行命令 continue else: # 无效输入 client_socket.sendall('无效输入'.encode()) else: # 无效请求 message = '无效请求!' client_socket.send(message.encode()) else: # 发送失败消息 message = '账号或密码错误!' client_socket.send(message.encode()) except Exception as e: logging.error(f"处理客户端连接时发生错误: {e}") finally: # 关闭连接 client_socket.close() while True: try: # 等待客户端连接 client_socket, client_address = server_socket.accept() logging.debug(f"接受到来自 {client_address} 的连接") # 创建新线程处理客户端连接 client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address)) client_thread.start() except OSError as e: if e.errno == 10038: print('服务器已关闭') break else: raise input("按回车键退出...")
客户端client.py
import socket def run_script(): try: print() ip1 = '192.168.137.1' print("请输入服务器的IP地址,不输入默认ip为", ip1) print() ip = input("请输入服务器的IP地址:") or ip1 print() # 服务器地址和端口 server_address = (ip, 5000) # 创建一个 TCP/IP socket client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # 连接到服务器 client_socket.connect(server_address) print('已连接到服务器') # 获取用户输入的账号和密码 account = input('请输入账号:') or 'root' password = input('请输入密码:') or '123456' # 发送账号和密码 message = f'{account}:{password}' client_socket.sendall(message.encode()) # 接收服务器响应 response = client_socket.recv(1024).decode() print('服务器响应:', response) if response == '登录成功!': print('登录成功') print() # 发送默认请求 print("[0]退出登录---[1]关机---[2]重启---[3]休眠") print("[4]锁定---[5]执行cmd命令") print() request = input('输入数字: ') or "5" # 你可以修改这个请求 if request in ['0', '1', '2', '3', '4', '5']: client_socket.sendall(request.encode()) print("请求已发送") if request == '0': print("退出登录成功") elif request == '5': while True: # 获取用户输入的命令 print() command = input('请输入要执行的命令:') or "echo %date% %time%" # 发送命令 client_socket.sendall(command.encode()) # 设置超时时间 client_socket.settimeout(60) try: print() print("接收超时时间为60秒") print() # 接收命令的输出 data = b'' count = 0 while count < 2: packet = client_socket.recv(4096) data += packet count += 1 output = data.decode('utf-8', errors='ignore') print(output) except socket.timeout: print("接收命令的输出超时") # 等待用户输入 print() user_input = input("是否继续执行命令?[1]继续---[2]退出: ") or "1" if user_input == '1': # 发送 client_socket.sendall(user_input.encode()) # 继续执行命令 continue elif user_input == '2': print("退出登录成功") break else: # 无效输入 print('无效输入,继续执行命令') # 发送 client_socket.sendall(user_input.encode()) # 继续执行命令 continue else: print("无效的请求") else: print('登录失败') finally: # 关闭连接 client_socket.close() user_choice = input("输入1继续运行,输入2结束代码: ") if user_choice == '2': print("结束代码") return elif user_choice == '1': print("继续运行") run_script() else: print("无效输入,默认退出代码") except Exception as e: print(f"An error occurred: {e}") else: print("Script completed successfully.") run_script()
4. 2.0版本相关源码
from flask import Flask, request, render_template_string, jsonify import subprocess import socket import os app = Flask(__name__) def get_local_ip(): try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: # 连接到一个公网DNS服务器,不发送任何数据,只是为了获取本地IP s.connect(("8.8.8.8", 53)) return s.getsockname()[0] except: return None # 如果无法获取IP,返回None def load_server_ip(): ip_file = 'server_ip.txt' if not os.path.exists(ip_file): return None with open(ip_file, 'r') as f: ip = f.read().strip() return ip if ip else None def save_server_ip(ip): ip_file = 'server_ip.txt' with open(ip_file, 'w') as f: f.write(ip) def ping_ip(ip): """ Ping指定的IP,返回True如果IP可达,否则返回False """ try: # 在Windows系统上使用-4参数强制使用IPv4 output = subprocess.check_output(['ping', '-4', '-n', '1', ip], stderr=subprocess.STDOUT, universal_newlines=True) if "unreachable" in output.lower(): return False return True except subprocess.CalledProcessError: return False def bind_server(host_ip): try: print(f"尝试绑定到IP: {host_ip}") app.run(host=host_ip, port=80) except Exception as e: print(f"绑定到 {host_ip} 失败: {e}") return False return True def main(): # 尝试从server_ip.txt读取IP server_ip = load_server_ip() if server_ip: print(f"从文件加载服务器IP: {server_ip}") if not ping_ip(server_ip): print(f"Ping测试失败,IP {server_ip} 不可用") server_ip = None else: print(f"Ping测试成功,IP {server_ip} 可用") else: server_ip = None # 重置server_ip,因为文件不存在或内容为空 # 如果server_ip.txt中的IP不可用,尝试获取本机IP if not server_ip: server_ip = get_local_ip() if server_ip: print(f"获取到服务器IP: {server_ip}") if not ping_ip(server_ip): print(f"Ping测试失败,IP {server_ip} 不可用") server_ip = None else: print(f"Ping测试成功,IP {server_ip} 可用") save_server_ip(server_ip) else: print("无法获取本机IP") # 尝试绑定到server_ip if server_ip: if not bind_server(server_ip): # 如果绑定失败,尝试绑定到127.0.0.1 print("绑定到指定IP失败,尝试绑定到127.0.0.1") if not bind_server('127.0.0.1'): print("绑定到127.0.0.1也失败,终止脚本") exit(1) else: # 如果没有有效的IP,尝试绑定到127.0.0.1 print("没有有效的IP,尝试绑定到127.0.0.1") if not bind_server('127.0.0.1'): print("绑定到127.0.0.1失败,终止脚本") exit(1) @app.route('/', methods=['GET', 'POST']) def login(): if request.method == 'POST': data = request.form if 'username' in data and 'password' in data: username = data['username'] password = data['password'] print(f"Received username: {username}, password: {password}") if username == 'root' and password == '123456': # 这里是用户名和密码,请自行修改。 return render_template_string(''' <!DOCTYPE html> <html> <head> <title>登录成功</title> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style> body { font-family: Arial, sans-serif; margin: 20px; } h1, h2 { text-align: center; } /* 按钮样式 */ .btn { padding: 10px 20px; font-size: 16px; margin: 5px; border: none; border-radius: 4px; cursor: pointer; background-color: #007bff; color: white; } .btn:hover { background-color: #0056b3; } /* 输入框样式 */ #command-input { width: 100%; height: 100px; font-size: 16px; padding: 10px; resize: vertical; border: 1px solid #ccc; border-radius: 4px; box-sizing: border-box; } /* 结果显示区域 */ #result { margin-top: 20px; padding: 10px; border: 1px solid #ddd; border-radius: 4px; background-color: #f9f9f9; white-space: pre-wrap; /* 保留换行 */ } /* 执行按钮样式 */ #execute-button { float: right; margin-top: 10px; } /* 响应式设计 */ @media (max-width: 600px) { .btn { width: 100%; font-size: 14px; } #command-input { height: 100px; font-size: 14px; } #execute-button { width: 100%; float: none; } #result { font-size: 14px; } } </style> </head> <body> <h1>登录成功</h1> <div style="text-align: center;"> <button class="btn">退出登录</button> <button class="btn">关机</button> <button class="btn">重启</button> <button class="btn">休眠</button> <button class="btn">锁定</button> </div> <br> <h2>执行命令</h2> <textarea id="command-input" placeholder="请输入命令"></textarea> <button id="execute-button" class="btn">执行</button> <h3>结果:</h3> <div id="result"></div> <script> function logout() { fetch('/logout', { method: 'GET' }) .then(response => response.json()) .then(data => { alert(data.message); window.location.href = '/'; }); } function shutdown() { fetch('/shutdown', { method: 'GET' }) .then(response => response.json()) .then(data => alert(data.message)); } function restart() { fetch('/restart', { method: 'GET' }) .then(response => response.json()) .then(data => alert(data.message)); } function sleep() { fetch('/sleep', { method: 'GET' }) .then(response => response.json()) .then(data => alert(data.message)); } function lock() { fetch('/lock', { method: 'GET' }) .then(response => response.json()) .then(data => alert(data.message)); } function executeCommand() { const command = document.getElementById('command-input').value; fetch('/execute', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ command: command }) }) .then(response => response.json()) .then(data => { document.getElementById('result').innerText = data.output; }) .catch(error => { document.getElementById('result').innerText = '执行失败'; }); } </script> </body> </html> ''') else: return jsonify({"message": "登录失败"}), 401 else: return jsonify({"message": "缺少用户名或密码"}), 400 else: return render_template_string(''' <!DOCTYPE html> <html> <head> <title>登录</title> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style> body { font-family: Arial, sans-serif; margin: 20px; } form { max-width: 400px; margin: 0 auto; } input[type="text"], input[type="password"] { width: 100%; padding: 10px; margin: 10px 0; border: 1px solid #ccc; border-radius: 4px; box-sizing: border-box; } input[type="submit"] { width: 100%; padding: 10px; background-color: #28a745; color: white; border: none; border-radius: 4px; cursor: pointer; } input[type="submit"]:hover { background-color: #218838; } @media (max-width: 600px) { form { max-width: 100%; } } </style> </head> <body> <h1>登录</h1> <form method="post"> <input type="text" name="username" placeholder="用户名"><br> <input type="password" name="password" placeholder="密码"><br> <input type="submit" value="登录"> </form> </body> </html> ''') @app.route('/logout', methods=['GET']) def logout(): return jsonify({"message": "退出登录成功"}), 200 @app.route('/shutdown', methods=['GET']) def shutdown(): try: subprocess.run(["shutdown", "/s", "/t", "1"], check=True) return jsonify({"message": "关机成功"}), 200 except subprocess.CalledProcessError as e: return jsonify({"message": "关机失败", "error": str(e)}), 500 @app.route('/restart', methods=['GET']) def restart(): try: subprocess.run(["shutdown", "/r", "/t", "1"], check=True) return jsonify({"message": "重启成功"}), 200 except subprocess.CalledProcessError as e: return jsonify({"message": "重启失败", "error": str(e)}), 500 @app.route('/sleep', methods=['GET']) def sleep(): try: subprocess.run(["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"], check=True) return jsonify({"message": "休眠成功"}), 200 except subprocess.CalledProcessError as e: return jsonify({"message": "休眠失败", "error": str(e)}), 500 @app.route('/lock', methods=['GET']) def lock(): try: subprocess.run(["rundll32", "user32.dll,LockWorkStation"], check=True) return jsonify({"message": "锁定成功"}), 200 except subprocess.CalledProcessError as e: return jsonify({"message": "锁定失败", "error": str(e)}), 500 @app.route('/execute', methods=['POST']) def execute(): data = request.get_json() command = data.get('command', '') try: result = subprocess.run(command, shell=True, capture_output=True, text=True) output = result.stdout + result.stderr return jsonify({"output": output}), 200 except Exception as e: return jsonify({"output": f"执行失败: {str(e)}"}), 500 if __name__ == '__main__': main()
到此这篇关于Python实现局域网远程控制电脑的文章就介绍到这了,更多相关Python远程控制电脑内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!