python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python远程控制电脑

Python实现局域网远程控制电脑

作者:hvinsion

这篇文章主要为大家详细介绍了如何利用Python编写一个工具,可以实现远程控制局域网电脑关机,重启,注销等功能,感兴趣的小伙伴可以参考一下

1.简介

一款由Python可以远程控制局域网电脑关机、重启、注销、锁定 、休眠、退出登录甚至能操作远程电脑Cmd终端命令的一款工具。资源及源码已打包,大家可自行下载。

工具分为1.0以及2.0版本,2.0版本在1.0终端命令行模式更改为网页界面化操作,可利用手机等多终端设备控制,更加美观实用;优化了端口设置,添加了条件判断;可结合“Radmin_LAN”软件,实现异地控制。工具可用Pyinstaller打包成无窗口后台静默运行。

默认账号:root

默认密码:123456

2. 运行效果

1.0版本运行效果:

2.0版本运行效果:

PC端页面效果:

服务端页面:

客户端登录页面:

客户端操作页面:

手机移动端展示效果:

3. 1.0版本相关源码

服务端server.py

import socket
import keyboard
import subprocess
import threading
import logging
import queue
 '''
import win32gui
import win32con
 
 
# 获取当前窗口句柄
hwnd = win32gui.GetForegroundWindow()
 
# 设置窗口属性,隐藏窗口
win32gui.ShowWindow(hwnd, win32con.SW_HIDE)
'''
 
# 配置日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
 
print("客户端登录账号:root,密码:123456,F3关闭服务器")
print()
ip1 = '192.168.137.1'  # 默认ip地址
print(f"请输入服务器IP地址(电脑的IPv4地址),不输入默认为 {ip1}")
 
# 超时设置,15秒
def get_input(prompt, timeout, result_queue):
    try:
        user_input = input(prompt)
        if user_input.strip() == "":
            result_queue.put(None)  # 如果用户直接回车,返回None
        else:
            result_queue.put(user_input)
    except Exception as e:
        logging.error(f"输入过程中发生错误: {e}")
 
# 创建一个线程来获取输入
print("超时时间---15秒")
print()
result_queue = queue.Queue()
input_thread = threading.Thread(target=get_input, args=("请输入服务器IP地址:", 15, result_queue))
input_thread.start()
 
# 等待输入,如果超时则返回默认值
input_thread.join(15)
if not result_queue.empty():
    ip = result_queue.get()
    if ip is None:
        ip = ip1  # 如果用户直接回车,使用默认值
else:
    ip = ip1
 
print(f"最终IP地址: {ip}")
 
'''
def get_ipv4_address():
    hostname = socket.gethostname()
    ip_address = socket.gethostbyname(hostname)
    return ip_address
 
ip=get_ipv4_address()
print(ip)
'''
 
 
# 创建一个 TCP/IP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
# 设置 SO_REUSEADDR 选项
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
# 绑定服务器地址和端口
server_address = (ip, 5000)
server_socket.bind(server_address)
 
# 监听连接
server_socket.listen(5)  # 支持最多5个并发连接
 
print('服务器正在等待连接...')
 
 
 
def close_server():
    print('服务器已关闭')
    server_socket.close()
    exit()
 
# 监听 F3 键关闭服务器
keyboard.add_hotkey('F3', close_server)
 
def handle_client(client_socket, client_address):
    try:
        print('连接来自:', client_address)
 
        # 接收账号和密码
        data = client_socket.recv(1024)
        logging.debug(f"接收到的数据: {data.decode()}")
 
        # 解析账号和密码
        try:
            account, password = data.decode().split(':')
        except ValueError:
            logging.error("账号和密码格式错误")
            message = '账号和密码格式错误!'
            client_socket.send(message.encode())
            return
 
        # 验证账号和密码
        if account == 'root' and password == '123456':
            # 发送成功消息
            message = '登录成功!'
            client_socket.send(message.encode())
 
            # 接收客户端请求
            while True:
                request = client_socket.recv(1024).decode()
                logging.debug(f"接收到的请求: {request}")
 
 
 
                if request == '1':
                    # 关机
                    subprocess.call('shutdown /s /t 0', shell=True)
                elif request == '2':
                    # 重启
                    subprocess.call('shutdown /r /t 0', shell=True)
                elif request == '3':
                    # 休眠
                    subprocess.call('rundll32.exe powrprof.dll,SetSuspendState 0,1,0', shell=True)
                elif request == '4':
                    # 锁定
                    subprocess.call('rundll32.exe user32.dll,LockWorkStation', shell=True)
                elif request == '0':
                    # 退出登录
                    print(client_address, "退出登录")
                    break
                elif request == '5':
                    # 等待用户输入
                    user_input = 1
                    while True:
                        # 执行cmd命令
                        command = client_socket.recv(1024).decode()
                        logging.debug(f"接收到的命令:{command}")
                        try:
                            # 执行命令
                            result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
                                                      stderr=subprocess.PIPE, text=True)
                            # 获取命令的输出
                            output, error = result.communicate()
                            # 打印命令的输出到本地控制台
                            logging.debug(f"{output}")
                            # 发送命令的输出
                            if error:
                                output = " "
                            client_socket.sendall(output.encode())
                            logging.debug(f"{error}")
                            # 发送命令的错误
                            client_socket.sendall(error.encode())
                            logging.debug("命令结果已发送")
                        except subprocess.CalledProcessError as e:
                            logging.error(f"{e.stderr}")
                            # 发送错误信息
                            client_socket.sendall(str(e).encode())
                        finally:
                            # 发送一个结束标记
                            client_socket.sendall(' '.encode())
                            # 等待用户输入
                        #接收客户端的user_input值
                        user_input = client_socket.recv(1024).decode()
                        logging.debug(f"接收到的请求: {user_input}")
                        if user_input == '1':
                            # 继续执行命令
                            continue
                        elif user_input == '2':
                            print(client_address, "退出登录")
                            # 退出循环
                            break
                        else:
                            # 无效输入
                            client_socket.sendall('无效输入'.encode())
                            # 继续执行命令
                            continue
 
                    else:
                        # 无效输入
                        client_socket.sendall('无效输入'.encode())
                else:
                    # 无效请求
                    message = '无效请求!'
                    client_socket.send(message.encode())
        else:
            # 发送失败消息
            message = '账号或密码错误!'
            client_socket.send(message.encode())
    except Exception as e:
        logging.error(f"处理客户端连接时发生错误: {e}")
    finally:
        # 关闭连接
        client_socket.close()
 
while True:
    try:
        # 等待客户端连接
        client_socket, client_address = server_socket.accept()
        logging.debug(f"接受到来自 {client_address} 的连接")
        # 创建新线程处理客户端连接
        client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
        client_thread.start()
    except OSError as e:
        if e.errno == 10038:
            print('服务器已关闭')
            break
        else:
            raise
 
input("按回车键退出...")

客户端client.py

import socket

def run_script():
    try:
        print()
        ip1 = '192.168.137.1'
        print("请输入服务器的IP地址,不输入默认ip为", ip1)
        print()
        ip = input("请输入服务器的IP地址:") or ip1
        print()
        # 服务器地址和端口
        server_address = (ip, 5000)
 
        # 创建一个 TCP/IP socket
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
        try:
            # 连接到服务器
            client_socket.connect(server_address)
            print('已连接到服务器')
 
            # 获取用户输入的账号和密码
            account = input('请输入账号:') or 'root'
            password = input('请输入密码:') or '123456'
 
            # 发送账号和密码
            message = f'{account}:{password}'
            client_socket.sendall(message.encode())
 
            # 接收服务器响应
            response = client_socket.recv(1024).decode()
            print('服务器响应:', response)
 
            if response == '登录成功!':
                print('登录成功')
                print()
                # 发送默认请求
                print("[0]退出登录---[1]关机---[2]重启---[3]休眠")
                print("[4]锁定---[5]执行cmd命令")
                print()
                request = input('输入数字: ') or "5"  # 你可以修改这个请求
                if request in ['0', '1', '2', '3', '4', '5']:
                    client_socket.sendall(request.encode())
                    print("请求已发送")
                    if request == '0':
                        print("退出登录成功")
                    elif request == '5':
                        while True:
                            # 获取用户输入的命令
                            print()
                            command = input('请输入要执行的命令:') or "echo %date% %time%"
                            # 发送命令
                            client_socket.sendall(command.encode())
                            # 设置超时时间
                            client_socket.settimeout(60)
 
                            try:
                                print()
                                print("接收超时时间为60秒")
                                print()
                                # 接收命令的输出
                                data = b''
                                count = 0
                                while count < 2:
                                    packet = client_socket.recv(4096)
                                    data += packet
                                    count += 1
 
                                output = data.decode('utf-8', errors='ignore')
                                print(output)
 
                            except socket.timeout:
                                print("接收命令的输出超时")
 
                            # 等待用户输入
 
                            print()
                            user_input = input("是否继续执行命令?[1]继续---[2]退出: ") or "1"
                            if user_input == '1':
                                # 发送
                                client_socket.sendall(user_input.encode())
                                # 继续执行命令
                                continue
                            elif user_input == '2':
                                print("退出登录成功")
                                break
 
                            else:
                                # 无效输入
                                print('无效输入,继续执行命令')
                                # 发送
                                client_socket.sendall(user_input.encode())
                                # 继续执行命令
                                continue
 
                else:
                    print("无效的请求")
            else:
                print('登录失败')
 
        finally:
            # 关闭连接
            client_socket.close()
 
        user_choice = input("输入1继续运行,输入2结束代码: ")
        if user_choice == '2':
            print("结束代码")
            return
        elif user_choice == '1':
            print("继续运行")
            run_script()
        else:
            print("无效输入,默认退出代码")
 
    except Exception as e:
        print(f"An error occurred: {e}")
 
    else:
        print("Script completed successfully.")
 
 
run_script()

4. 2.0版本相关源码

from flask import Flask, request, render_template_string, jsonify
import subprocess
import socket
import os 
app = Flask(__name__)
 
def get_local_ip():
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
            # 连接到一个公网DNS服务器,不发送任何数据,只是为了获取本地IP
            s.connect(("8.8.8.8", 53))
            return s.getsockname()[0]
    except:
        return None  # 如果无法获取IP,返回None
 
def load_server_ip():
    ip_file = 'server_ip.txt'
    if not os.path.exists(ip_file):
        return None
    with open(ip_file, 'r') as f:
        ip = f.read().strip()
        return ip if ip else None
 
def save_server_ip(ip):
    ip_file = 'server_ip.txt'
    with open(ip_file, 'w') as f:
        f.write(ip)
 
def ping_ip(ip):
    """
    Ping指定的IP,返回True如果IP可达,否则返回False
    """
    try:
        # 在Windows系统上使用-4参数强制使用IPv4
        output = subprocess.check_output(['ping', '-4', '-n', '1', ip], stderr=subprocess.STDOUT, universal_newlines=True)
        if "unreachable" in output.lower():
            return False
        return True
    except subprocess.CalledProcessError:
        return False
 
def bind_server(host_ip):
    try:
        print(f"尝试绑定到IP: {host_ip}")
        app.run(host=host_ip, port=80)
    except Exception as e:
        print(f"绑定到 {host_ip} 失败: {e}")
        return False
    return True
 
def main():
    # 尝试从server_ip.txt读取IP
    server_ip = load_server_ip()
    if server_ip:
        print(f"从文件加载服务器IP: {server_ip}")
        if not ping_ip(server_ip):
            print(f"Ping测试失败,IP {server_ip} 不可用")
            server_ip = None
        else:
            print(f"Ping测试成功,IP {server_ip} 可用")
    else:
        server_ip = None  # 重置server_ip,因为文件不存在或内容为空
 
    # 如果server_ip.txt中的IP不可用,尝试获取本机IP
    if not server_ip:
        server_ip = get_local_ip()
        if server_ip:
            print(f"获取到服务器IP: {server_ip}")
            if not ping_ip(server_ip):
                print(f"Ping测试失败,IP {server_ip} 不可用")
                server_ip = None
            else:
                print(f"Ping测试成功,IP {server_ip} 可用")
                save_server_ip(server_ip)
        else:
            print("无法获取本机IP")
 
    # 尝试绑定到server_ip
    if server_ip:
        if not bind_server(server_ip):
            # 如果绑定失败,尝试绑定到127.0.0.1
            print("绑定到指定IP失败,尝试绑定到127.0.0.1")
            if not bind_server('127.0.0.1'):
                print("绑定到127.0.0.1也失败,终止脚本")
                exit(1)
    else:
        # 如果没有有效的IP,尝试绑定到127.0.0.1
        print("没有有效的IP,尝试绑定到127.0.0.1")
        if not bind_server('127.0.0.1'):
            print("绑定到127.0.0.1失败,终止脚本")
            exit(1)
 
@app.route('/', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        data = request.form
        if 'username' in data and 'password' in data:
            username = data['username']
            password = data['password']
            print(f"Received username: {username}, password: {password}")
 
            if username == 'root' and password == '123456': # 这里是用户名和密码,请自行修改。
                return render_template_string('''
                    <!DOCTYPE html>
                    <html>
                    <head>
                        <title>登录成功</title>
                        <meta name="viewport" content="width=device-width, initial-scale=1.0">
                        <style>
                            body {
                                font-family: Arial, sans-serif;
                                margin: 20px;
                            }
                            h1, h2 {
                                text-align: center;
                            }
                            /* 按钮样式 */
                            .btn {
                                padding: 10px 20px;
                                font-size: 16px;
                                margin: 5px;
                                border: none;
                                border-radius: 4px;
                                cursor: pointer;
                                background-color: #007bff;
                                color: white;
                            }
                            .btn:hover {
                                background-color: #0056b3;
                            }
                            /* 输入框样式 */
                            #command-input {
                                width: 100%;
                                height: 100px;
                                font-size: 16px;
                                padding: 10px;
                                resize: vertical;
                                border: 1px solid #ccc;
                                border-radius: 4px;
                                box-sizing: border-box;
                            }
                            /* 结果显示区域 */
                            #result {
                                margin-top: 20px;
                                padding: 10px;
                                border: 1px solid #ddd;
                                border-radius: 4px;
                                background-color: #f9f9f9;
                                white-space: pre-wrap; /* 保留换行 */
                            }
                            /* 执行按钮样式 */
                            #execute-button {
                                float: right;
                                margin-top: 10px;
                            }
                            /* 响应式设计 */
                            @media (max-width: 600px) {
                                .btn {
                                    width: 100%;
                                    font-size: 14px;
                                }
                                #command-input {
                                    height: 100px;
                                    font-size: 14px;
                                }
                                #execute-button {
                                    width: 100%;
                                    float: none;
                                }
                                #result {
                                    font-size: 14px;
                                }
                            }
                        </style>
                    </head>
                    <body>
                        <h1>登录成功</h1>
                        <div style="text-align: center;">
                            <button class="btn">退出登录</button>
                            <button class="btn">关机</button>
                            <button class="btn">重启</button>
                            <button class="btn">休眠</button>
                            <button class="btn">锁定</button>
                        </div>
                        <br>
                        <h2>执行命令</h2>
                        <textarea id="command-input" placeholder="请输入命令"></textarea>
                        <button id="execute-button" class="btn">执行</button>
                        <h3>结果:</h3>
                        <div id="result"></div>
 
                        <script>
                            function logout() {
                                fetch('/logout', { method: 'GET' })
                                    .then(response => response.json())
                                    .then(data => {
                                        alert(data.message);
                                        window.location.href = '/';
                                    });
                            }
 
                            function shutdown() {
                                fetch('/shutdown', { method: 'GET' })
                                    .then(response => response.json())
                                    .then(data => alert(data.message));
                            }
 
                            function restart() {
                                fetch('/restart', { method: 'GET' })
                                    .then(response => response.json())
                                    .then(data => alert(data.message));
                            }
 
                            function sleep() {
                                fetch('/sleep', { method: 'GET' })
                                    .then(response => response.json())
                                    .then(data => alert(data.message));
                            }
 
                            function lock() {
                                fetch('/lock', { method: 'GET' })
                                    .then(response => response.json())
                                    .then(data => alert(data.message));
                            }
 
                            function executeCommand() {
                                const command = document.getElementById('command-input').value;
                                fetch('/execute', {
                                    method: 'POST',
                                    headers: {
                                        'Content-Type': 'application/json'
                                    },
                                    body: JSON.stringify({ command: command })
                                })
                                .then(response => response.json())
                                .then(data => {
                                    document.getElementById('result').innerText = data.output;
                                })
                                .catch(error => {
                                    document.getElementById('result').innerText = '执行失败';
                                });
                            }
                        </script>
                    </body>
                    </html>
                ''')
            else:
                return jsonify({"message": "登录失败"}), 401
        else:
            return jsonify({"message": "缺少用户名或密码"}), 400
    else:
        return render_template_string('''
            <!DOCTYPE html>
            <html>
            <head>
                <title>登录</title>
                <meta name="viewport" content="width=device-width, initial-scale=1.0">
                <style>
                    body {
                        font-family: Arial, sans-serif;
                        margin: 20px;
                    }
                    form {
                        max-width: 400px;
                        margin: 0 auto;
                    }
                    input[type="text"], input[type="password"] {
                        width: 100%;
                        padding: 10px;
                        margin: 10px 0;
                        border: 1px solid #ccc;
                        border-radius: 4px;
                        box-sizing: border-box;
                    }
                    input[type="submit"] {
                        width: 100%;
                        padding: 10px;
                        background-color: #28a745;
                        color: white;
                        border: none;
                        border-radius: 4px;
                        cursor: pointer;
                    }
                    input[type="submit"]:hover {
                        background-color: #218838;
                    }
                    @media (max-width: 600px) {
                        form {
                            max-width: 100%;
                        }
                    }
                </style>
            </head>
            <body>
                <h1>登录</h1>
                <form method="post">
                    <input type="text" name="username" placeholder="用户名"><br>
                    <input type="password" name="password" placeholder="密码"><br>
                    <input type="submit" value="登录">
                </form>
            </body>
            </html>
        ''')
 
@app.route('/logout', methods=['GET'])
def logout():
    return jsonify({"message": "退出登录成功"}), 200
 
@app.route('/shutdown', methods=['GET'])
def shutdown():
    try:
        subprocess.run(["shutdown", "/s", "/t", "1"], check=True)
        return jsonify({"message": "关机成功"}), 200
    except subprocess.CalledProcessError as e:
        return jsonify({"message": "关机失败", "error": str(e)}), 500
 
@app.route('/restart', methods=['GET'])
def restart():
    try:
        subprocess.run(["shutdown", "/r", "/t", "1"], check=True)
        return jsonify({"message": "重启成功"}), 200
    except subprocess.CalledProcessError as e:
        return jsonify({"message": "重启失败", "error": str(e)}), 500
 
@app.route('/sleep', methods=['GET'])
def sleep():
    try:
        subprocess.run(["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"], check=True)
        return jsonify({"message": "休眠成功"}), 200
    except subprocess.CalledProcessError as e:
        return jsonify({"message": "休眠失败", "error": str(e)}), 500
 
@app.route('/lock', methods=['GET'])
def lock():
    try:
        subprocess.run(["rundll32", "user32.dll,LockWorkStation"], check=True)
        return jsonify({"message": "锁定成功"}), 200
    except subprocess.CalledProcessError as e:
        return jsonify({"message": "锁定失败", "error": str(e)}), 500
 
@app.route('/execute', methods=['POST'])
def execute():
    data = request.get_json()
    command = data.get('command', '')
    try:
        result = subprocess.run(command, shell=True, capture_output=True, text=True)
        output = result.stdout + result.stderr
        return jsonify({"output": output}), 200
    except Exception as e:
        return jsonify({"output": f"执行失败: {str(e)}"}), 500
 
if __name__ == '__main__':
    main()

到此这篇关于Python实现局域网远程控制电脑的文章就介绍到这了,更多相关Python远程控制电脑内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文